Skip to content

Generator 异步应用

JavaScript 语言的执行环境是“单线程”的,如果没有异步编程,每次耗时操作都会阻塞程序。 ES6 之前,异步编程的方法,大概有下面四种

  • 回调函数
  • 事件监听
  • 发布/订阅
  • Promise 对象

Generator 函数将 JavaScript 异步编程带入 全新阶段

基本概念

异步

异步:一个任务不连续完成。该任务被人为分成两段,先执行第一段,然后转而执行其他任务,等做好了准备,再回过头执行第二段

同步:连续的执行。同步任务要等待上一个任务执行完成,才会执行下一个任务。会阻塞

回调函数

回调函数:第二段任务写在回调函数,当重新执行时,直接调用这个函数。

疑问:为什么 Node 约定,回调函数的第一个参数,必须是错误对象 err 原因:执行分成两段,第一段执行完以后,任务所在的上下文环境就已经结束。这以后抛出的错误,原来的上下文环境已经无法捕捉,只能当作参数,传入第二段

Promise

两个以上的异步操作,会出现多重嵌套,代码横向发展。多个异步操作强耦合,一个操作修改可能连带要修改多个操作(回调地狱)。

js
fs.readFile(fileA, 'utf-8', function (err, data) {
  fs.readFile(fileB, 'utf-8', function (err, data) {
    // ...
  })
})

Promise:为了解决回调地狱。将嵌套的回调函数,改为链式调用

js
var readFile = require('fs-readfile-promise')

readFile(fileA)
  .then(function (data) {
    console.log(data.toString())
  })
  .then(function () {
    return readFile(fileB)
  })
  .then(function (data) {
    console.log(data.toString())
  })
  .catch(function (err) {
    console.log(err)
  })

Promise 问题:代码冗余,通过 Promise 包装后,不易理解(全是 then)。

Gemerator 函数

协程

早期异步方案中,有一种协程。通过多个线程协作,完成异步任务。

运行流程

  1. 协程 A 开始执行
  2. 协程 A 执行到一半,进入暂停,执行权转移到协程 B
  3. (一段时间后)协程 B 交还执行权
  4. 协程 A 恢复执行。
js
function* asyncJob() {
  // ...其他代码
  var f = yield readFile(fileA)
  // ...其他代码
}

yield 命令是异步两个阶段的分界线。协程遇到 yield 命令就暂停,等到执行权返回,再从暂停的地方继续往后执行。

协程的 Generator 函数实现

Generator 函数是协程在 ES6 的实现,最大特点:交出函数的执行权(即暂停执行)

Generator 函数:一个异步任务容器,通过 yield 暂停任务,再通过 next 继续执行

函数数据交换 和 错误处理

Generator 函数可以暂停执行和恢复执行,这是它能封装异步任务的根本原因

还有两个特性,使它可以作为异步编程的完整解决方案:函数体内外的数据交换和错误处理机制

函数体内外的数据交换

next 返回值:向外输出数据

next 接受参数:向 Generator 函数体内 输入数据

js
function* gen(x) {
  var y = yield x + 2
  return y
}

var g = gen(1)
g.next() // { value: 3, done: false }
g.next(2) // { value: 2, done: true }

错误处理机制

Generator 内部部署错误处理代码,捕获函数体外抛出的错误。处理错误和出错代码,进行了时间和空间的分离

js
function* gen(x) {
  try {
    var y = yield x + 2
  } catch (e) {
    console.log(e)
  }
  return y
}

var g = gen(1)
g.next()
g.throw('出错了')
// 出错了

异步任务的封装

使用 Generator 函数,执行一个真实的异步任务

js
var fetch = require('node-fetch')

function* gen() {
  var url = 'https://api.github.com/users/github'
  var result = yield fetch(url)
  console.log(result.bio)
}
js
var g = gen()
var result = g.next()

result.value
  .then(function (data) {
    return data.json()
  })
  .then(function (data) {
    g.next(data)
  })

@DIF Generator 函数将异步操作表示得很简洁,但是流程管理却不方便(即何时执行第一阶段、何时执行第二阶段)

Thunk 函数

自动执行 Generator 的一种方法

参数求值策略

求值策略:即函数的参数到底应该何时求值

两种方式

js
var x = 1

function f(m) {
  return m * 2
}

f(x + 5)
  1. 传值调用(call by value):即在进入函数体之前,就计算 x + 5 的值。在调用前求职,可能造成性能损失
js
f(x + 5)
// 传值调用时,等同于
f(6)
  1. 传名调用(call by name):即直接将表达式 x + 5 传入函数体,只在用到它的时候求值
js
f(x + 5)(
  // 传名调用时,等同于
  x + 5
) * 2

Thunk 函数意义

传名调用实现:将参数放到一个临时函数,再将这个临时函数传入函数体。这个临时函数就叫做 Thunk 函数

js
function f(m) {
  return m * 2
}

f(x + 5)

// 等同于

var thunk = function () {
  return x + 5
}

function f(thunk) {
  return thunk() * 2
}

JavaScript 中 Thunk 函数

Thunk 函数是传名调用的一种实现策略,用来替换某个表达式。在 JavaScript 语言中,Thunk 函数替换的不是表达式。而是将多参数函数,替换成一个只接受回调函数作为参数的单参数函数

js
// 正常版本的readFile(多参数版本)
fs.readFile(fileName, callback)

// Thunk版本的readFile(单参数版本)
var Thunk = function (fileName) {
  return function (callback) {
    return fs.readFile(fileName, callback)
  }
}

var readFileThunk = Thunk(fileName)
readFileThunk(callback)

任何函数,只要参数有回调函数,就能写成 Thunk 函数的形式

js
// ES5版本
var Thunk = function (fn) {
  return function () {
    // 类数组转为数组
    var args = Array.prototype.slice.call(arguments)
    return function (callback) {
      args.push(callback)
      return fn.apply(this, args)
    }
  }
}

// ES6版本
const Thunk = function (fn) {
  return function (...args) {
    return function (callback) {
      return fn.call(this, ...args, callback)
    }
  }
}

第一层封装参数,第二层封装函数。上面例子的两个用法

js
var readFileThunk = Thunk(fs.readFile)
readFileThunk(fileA)(callback)
js
function f(a, cb) {
  cb(a)
}
const ft = Thunk(f)

ft(1)(console.log) // 1

Thunkify 模块

生产环境的转换器,建议使用 Thunkify 模块

安装

bash
npm install thunkify

使用方法

js
var thunkify = require('thunkify')
var fs = require('fs')

var read = thunkify(fs.readFile)
read('package.json')(function (err, str) {
  // ...
})

Thunkify 源码

called 保证 回调函数只运行一次

js
function thunkify(fn) {
  return function () {
    var args = new Array(arguments.length)
    var ctx = this

    for (var i = 0; i < args.length; ++i) {
      args[i] = arguments[i]
    }

    return function (done) {
      var called

      args.push(function () {
        if (called) return
        called = true
        done.apply(null, arguments)
      })

      try {
        fn.apply(ctx, args)
      } catch (err) {
        done(err)
      }
    }
  }
}
js
function f(a, b, callback) {
  var sum = a + b
  callback(sum)
  callback(sum)
}

var ft = thunkify(f)
// https://www.tjvantoll.com/2015/12/29/console-error-bind/
// 要求console中this必须指向console对象,目前测试没有这个问题
var print = console.log.bind(console)
ft(1, 2)(print)
// 3

Generator 函数的流程管理

Thunk 函数可以用于 Generator 函数的自动流程管理

js
function* gen() {
  // ...
}

var g = gen()
var res = g.next()

while (!res.done) {
  console.log(res.value)
  res = g.next()
}

问题:不适合异步操作。如果必须保证前一步执行完,才能执行后一步,上面的自动执行就不可行。

解决:Thunk 函数,因为可以在回调函数,将执行权交还 Generator 函数

js
var fs = require('fs')
var thunkify = require('thunkify')
var readFileThunk = thunkify(fs.readFile)

var gen = function* () {
  var r1 = yield readFileThunk('/etc/fstab')
  console.log(r1.toString())
  var r2 = yield readFileThunk('/etc/shells')
  console.log(r2.toString())
}

手动执行:将同一回调函数传入 r1.value,r2.value 中

js
var g = gen()

var r1 = g.next()
r1.value(function (err, data) {
  if (err) throw err
  // 将回调中读取的data传到外部
  var r2 = g.next(data)
  r2.value(function (err, data) {
    if (err) throw err
    g.next(data)
  })
})

Thunk 函数的自动流程管理

Thunk 函数真正的威力,在于可以自动执行 Generator 函数

js
// 封装上面的嵌套写法
function run(fn) {
  var gen = fn()

  function next(err, data) {
    var result = gen.next(data)
    if (result.done) return
    result.value(next)
  }

  next()
}

function* g() {
  // ...
}

run(g)

run 函数,就是一个 Generator 函数的自动执行器。通过判断 done 来确定是否终止。如果未终止,将 next 函数(thunk 函数)通过递归传递。

使用

异步操作可以写得像同步操作,而且一行代码就可以执行

js
var g = function* () {
  var f1 = yield readFileThunk('fileA')
  var f2 = yield readFileThunk('fileB')
  // ...
  var fn = yield readFileThunk('fileN')
}

run(g)

Thunk 函数并不是 Generator 函数自动执行的唯一方案。

因为自动执行的关键:必须有一种机制,自动控制 Generator 函数的流程,接收和交还程序的执行权。回调函数,Promise 对象都可以

co 模块

co 模块:用于 Generator 函数的自动执行。

示例:依次读取两个文件

js
var gen = function* () {
  var f1 = yield readFile('/etc/fstab')
  var f2 = yield readFile('/etc/shells')
  console.log(f1.toString())
  console.log(f2.toString())
}

co 模块可以直接执行 Generator 函数,返回一个 Promise 对象

js
var co = require('co')

co(gen).then(function () {
  console.log('Generator 函数执行完成')
})

自动执行原理

Generator 是一个异步操作容器。自动执行需要一种机制,在异步操作有结果时,能够自动交回执行权

通过以下两种方法

  1. 回调函数。将异步操作包装成 Thunk 函数,在回调函数里面交回执行权。
  2. Promise 对象。将异步操作包装成 Promise 对象,用 then 方法交回执行权

co 模块是将 回调函数 和 Promise 对象结合。

co 源码

源码分析

  1. 接受 Generator 函数作为参数,返回一个 Promise 对象
  2. 判断 参数是否为 Generator 函数
    1. 是,执行
    2. 否,终止,resolve 这个参数
  3. 将 next 方法包装,便于错误捕获
  4. next 函数(关键),调用自身
js
function co(gen) {
  var ctx = this

  return new Promise(function (resolve, reject) {
    if (typeof gen === 'function') gen = gen.call(ctx)
    if (!gen || typeof gen.next !== 'function') return resolve(gen)

    onFulfilled()
    function onFulfilled(res) {
      var ret
      try {
        ret = gen.next(res)
      } catch (e) {
        return reject(e)
      }
      next(ret)
    }
  })
}

next 函数解析

  1. 循环终止判断
  2. 保证 返回值为 Promise 对象
  3. 通过 then 方法回调,并传递参数
  4. 参数不满足抛出错误(非 Thunk 函数和 Promise 对象)
js
function next(ret) {
  if (ret.done) return resolve(ret.value)
  var value = toPromise.call(ctx, ret.value)
  if (value && isPromise(value)) return value.then(onFulfilled, onRejected)
  return onRejected(
    new TypeError(
      'You may only yield a function, promise, generator, array, or object, ' +
        'but the following object was passed: "' +
        String(ret.value) +
        '"'
    )
  )
}

处理并发异步

co 支持并发的异步操作,即允许某些操作同时进行,等到它们全部完成,才进行下`一步

js
// 数组的写法
co(function* () {
  var res = yield [Promise.resolve(1), Promise.resolve(2)]
  console.log(res)
}).catch(onerror)

// 对象的写法
co(function* () {
  var res = yield {
    1: Promise.resolve(1),
    2: Promise.resolve(2)
  }
  console.log(res)
}).catch(onerror)

// 遍历
co(function* () {
  var values = [n1, n2, n3]
  yield values.map(somethingAsync)
})

function* somethingAsync(x) {
  // do something async
  return y
}

实例:处理 Stream

Node 提供 Stream 模式读写数据,一次只处理数据的一部分,数据分成一块块依次处理,就好像“数据流”一样。 Stream 模式使用 EventEmitter API,会释放三个事件

data:下一块数据准备好 end:数据处理完成 error:发生错误

js
const co = require('co')
const fs = require('fs')

const stream = fs.createReadStream('./les_miserables.txt')
let valjeanCount = 0

co(function* () {
  while (true) {
    // 将最先发生的事件抛出
    const res = yield Promise.race([
      new Promise((resolve) => stream.once('data', resolve)),
      new Promise((resolve) => stream.once('end', resolve)),
      new Promise((resolve, reject) => stream.once('error', reject))
    ])
    if (!res) {
      break
    }
    stream.removeAllListeners('data')
    stream.removeAllListeners('end')
    stream.removeAllListeners('error')
    valjeanCount += (res.toString().match(/valjean/gi) || []).length
  }
  console.log('count:', valjeanCount) // count: 1120
})

相关链接

[-] Generator 异步应用

[-] Node.js 的 thunk 函数&柯里化

[-] 柯里化,偏函数,Thunk