Appearance
Generator 异步应用
JavaScript 语言的执行环境是“单线程”的,如果没有异步编程,每次耗时操作都会阻塞程序。 ES6 之前,异步编程的方法,大概有下面四种
- 回调函数
- 事件监听
- 发布/订阅
- Promise 对象
Generator 函数将 JavaScript 异步编程带入 全新阶段
基本概念
异步
异步:一个任务不连续完成。该任务被人为分成两段,先执行第一段,然后转而执行其他任务,等做好了准备,再回过头执行第二段
同步:连续的执行。同步任务要等待上一个任务执行完成,才会执行下一个任务。会阻塞
回调函数
回调函数:第二段任务写在回调函数,当重新执行时,直接调用这个函数。
疑问:为什么 Node 约定,回调函数的第一个参数,必须是错误对象 err 原因:执行分成两段,第一段执行完以后,任务所在的上下文环境就已经结束。这以后抛出的错误,原来的上下文环境已经无法捕捉,只能当作参数,传入第二段
Promise
两个以上的异步操作,会出现多重嵌套,代码横向发展。多个异步操作强耦合,一个操作修改可能连带要修改多个操作(回调地狱)。
js
fs.readFile(fileA, 'utf-8', function (err, data) {
fs.readFile(fileB, 'utf-8', function (err, data) {
// ...
})
})Promise:为了解决回调地狱。将嵌套的回调函数,改为链式调用
js
var readFile = require('fs-readfile-promise')
readFile(fileA)
.then(function (data) {
console.log(data.toString())
})
.then(function () {
return readFile(fileB)
})
.then(function (data) {
console.log(data.toString())
})
.catch(function (err) {
console.log(err)
})Promise 问题:代码冗余,通过 Promise 包装后,不易理解(全是 then)。
Gemerator 函数
协程
早期异步方案中,有一种协程。通过多个线程协作,完成异步任务。
运行流程
- 协程 A 开始执行
- 协程 A 执行到一半,进入暂停,执行权转移到协程 B
- (一段时间后)协程 B 交还执行权
- 协程 A 恢复执行。
js
function* asyncJob() {
// ...其他代码
var f = yield readFile(fileA)
// ...其他代码
}yield 命令是异步两个阶段的分界线。协程遇到 yield 命令就暂停,等到执行权返回,再从暂停的地方继续往后执行。
协程的 Generator 函数实现
Generator 函数是协程在 ES6 的实现,最大特点:交出函数的执行权(即暂停执行)
Generator 函数:一个异步任务容器,通过 yield 暂停任务,再通过 next 继续执行
函数数据交换 和 错误处理
Generator 函数可以暂停执行和恢复执行,这是它能封装异步任务的根本原因
还有两个特性,使它可以作为异步编程的完整解决方案:函数体内外的数据交换和错误处理机制
函数体内外的数据交换
next 返回值:向外输出数据
next 接受参数:向 Generator 函数体内 输入数据
js
function* gen(x) {
var y = yield x + 2
return y
}
var g = gen(1)
g.next() // { value: 3, done: false }
g.next(2) // { value: 2, done: true }错误处理机制
Generator 内部部署错误处理代码,捕获函数体外抛出的错误。处理错误和出错代码,进行了时间和空间的分离
js
function* gen(x) {
try {
var y = yield x + 2
} catch (e) {
console.log(e)
}
return y
}
var g = gen(1)
g.next()
g.throw('出错了')
// 出错了异步任务的封装
使用 Generator 函数,执行一个真实的异步任务
js
var fetch = require('node-fetch')
function* gen() {
var url = 'https://api.github.com/users/github'
var result = yield fetch(url)
console.log(result.bio)
}js
var g = gen()
var result = g.next()
result.value
.then(function (data) {
return data.json()
})
.then(function (data) {
g.next(data)
})@DIF Generator 函数将异步操作表示得很简洁,但是流程管理却不方便(即何时执行第一阶段、何时执行第二阶段)
Thunk 函数
自动执行 Generator 的一种方法
参数求值策略
求值策略:即函数的参数到底应该何时求值
两种方式
js
var x = 1
function f(m) {
return m * 2
}
f(x + 5)- 传值调用(call by value):即在进入函数体之前,就计算 x + 5 的值。在调用前求职,可能造成性能损失
js
f(x + 5)
// 传值调用时,等同于
f(6)- 传名调用(call by name):即直接将表达式 x + 5 传入函数体,只在用到它的时候求值
js
f(x + 5)(
// 传名调用时,等同于
x + 5
) * 2Thunk 函数意义
传名调用实现:将参数放到一个临时函数,再将这个临时函数传入函数体。这个临时函数就叫做 Thunk 函数
js
function f(m) {
return m * 2
}
f(x + 5)
// 等同于
var thunk = function () {
return x + 5
}
function f(thunk) {
return thunk() * 2
}JavaScript 中 Thunk 函数
Thunk 函数是传名调用的一种实现策略,用来替换某个表达式。在 JavaScript 语言中,Thunk 函数替换的不是表达式。而是将多参数函数,替换成一个只接受回调函数作为参数的单参数函数
js
// 正常版本的readFile(多参数版本)
fs.readFile(fileName, callback)
// Thunk版本的readFile(单参数版本)
var Thunk = function (fileName) {
return function (callback) {
return fs.readFile(fileName, callback)
}
}
var readFileThunk = Thunk(fileName)
readFileThunk(callback)任何函数,只要参数有回调函数,就能写成 Thunk 函数的形式
js
// ES5版本
var Thunk = function (fn) {
return function () {
// 类数组转为数组
var args = Array.prototype.slice.call(arguments)
return function (callback) {
args.push(callback)
return fn.apply(this, args)
}
}
}
// ES6版本
const Thunk = function (fn) {
return function (...args) {
return function (callback) {
return fn.call(this, ...args, callback)
}
}
}第一层封装参数,第二层封装函数。上面例子的两个用法
js
var readFileThunk = Thunk(fs.readFile)
readFileThunk(fileA)(callback)js
function f(a, cb) {
cb(a)
}
const ft = Thunk(f)
ft(1)(console.log) // 1Thunkify 模块
生产环境的转换器,建议使用 Thunkify 模块
安装
bash
npm install thunkify使用方法
js
var thunkify = require('thunkify')
var fs = require('fs')
var read = thunkify(fs.readFile)
read('package.json')(function (err, str) {
// ...
})Thunkify 源码
called 保证 回调函数只运行一次
js
function thunkify(fn) {
return function () {
var args = new Array(arguments.length)
var ctx = this
for (var i = 0; i < args.length; ++i) {
args[i] = arguments[i]
}
return function (done) {
var called
args.push(function () {
if (called) return
called = true
done.apply(null, arguments)
})
try {
fn.apply(ctx, args)
} catch (err) {
done(err)
}
}
}
}js
function f(a, b, callback) {
var sum = a + b
callback(sum)
callback(sum)
}
var ft = thunkify(f)
// https://www.tjvantoll.com/2015/12/29/console-error-bind/
// 要求console中this必须指向console对象,目前测试没有这个问题
var print = console.log.bind(console)
ft(1, 2)(print)
// 3Generator 函数的流程管理
Thunk 函数可以用于 Generator 函数的自动流程管理
js
function* gen() {
// ...
}
var g = gen()
var res = g.next()
while (!res.done) {
console.log(res.value)
res = g.next()
}问题:不适合异步操作。如果必须保证前一步执行完,才能执行后一步,上面的自动执行就不可行。
解决:Thunk 函数,因为可以在回调函数,将执行权交还 Generator 函数
js
var fs = require('fs')
var thunkify = require('thunkify')
var readFileThunk = thunkify(fs.readFile)
var gen = function* () {
var r1 = yield readFileThunk('/etc/fstab')
console.log(r1.toString())
var r2 = yield readFileThunk('/etc/shells')
console.log(r2.toString())
}手动执行:将同一回调函数传入 r1.value,r2.value 中
js
var g = gen()
var r1 = g.next()
r1.value(function (err, data) {
if (err) throw err
// 将回调中读取的data传到外部
var r2 = g.next(data)
r2.value(function (err, data) {
if (err) throw err
g.next(data)
})
})Thunk 函数的自动流程管理
Thunk 函数真正的威力,在于可以自动执行 Generator 函数
js
// 封装上面的嵌套写法
function run(fn) {
var gen = fn()
function next(err, data) {
var result = gen.next(data)
if (result.done) return
result.value(next)
}
next()
}
function* g() {
// ...
}
run(g)run 函数,就是一个 Generator 函数的自动执行器。通过判断 done 来确定是否终止。如果未终止,将 next 函数(thunk 函数)通过递归传递。
使用
异步操作可以写得像同步操作,而且一行代码就可以执行
js
var g = function* () {
var f1 = yield readFileThunk('fileA')
var f2 = yield readFileThunk('fileB')
// ...
var fn = yield readFileThunk('fileN')
}
run(g)Thunk 函数并不是 Generator 函数自动执行的唯一方案。
因为自动执行的关键:必须有一种机制,自动控制 Generator 函数的流程,接收和交还程序的执行权。回调函数,Promise 对象都可以
co 模块
co 模块:用于 Generator 函数的自动执行。
示例:依次读取两个文件
js
var gen = function* () {
var f1 = yield readFile('/etc/fstab')
var f2 = yield readFile('/etc/shells')
console.log(f1.toString())
console.log(f2.toString())
}co 模块可以直接执行 Generator 函数,返回一个 Promise 对象
js
var co = require('co')
co(gen).then(function () {
console.log('Generator 函数执行完成')
})自动执行原理
Generator 是一个异步操作容器。自动执行需要一种机制,在异步操作有结果时,能够自动交回执行权
通过以下两种方法
- 回调函数。将异步操作包装成 Thunk 函数,在回调函数里面交回执行权。
- Promise 对象。将异步操作包装成 Promise 对象,用 then 方法交回执行权
co 模块是将 回调函数 和 Promise 对象结合。
co 源码
源码分析
- 接受 Generator 函数作为参数,返回一个 Promise 对象
- 判断 参数是否为 Generator 函数
- 是,执行
- 否,终止,resolve 这个参数
- 将 next 方法包装,便于错误捕获
- next 函数(关键),调用自身
js
function co(gen) {
var ctx = this
return new Promise(function (resolve, reject) {
if (typeof gen === 'function') gen = gen.call(ctx)
if (!gen || typeof gen.next !== 'function') return resolve(gen)
onFulfilled()
function onFulfilled(res) {
var ret
try {
ret = gen.next(res)
} catch (e) {
return reject(e)
}
next(ret)
}
})
}next 函数解析
- 循环终止判断
- 保证 返回值为 Promise 对象
- 通过 then 方法回调,并传递参数
- 参数不满足抛出错误(非 Thunk 函数和 Promise 对象)
js
function next(ret) {
if (ret.done) return resolve(ret.value)
var value = toPromise.call(ctx, ret.value)
if (value && isPromise(value)) return value.then(onFulfilled, onRejected)
return onRejected(
new TypeError(
'You may only yield a function, promise, generator, array, or object, ' +
'but the following object was passed: "' +
String(ret.value) +
'"'
)
)
}处理并发异步
co 支持并发的异步操作,即允许某些操作同时进行,等到它们全部完成,才进行下`一步
js
// 数组的写法
co(function* () {
var res = yield [Promise.resolve(1), Promise.resolve(2)]
console.log(res)
}).catch(onerror)
// 对象的写法
co(function* () {
var res = yield {
1: Promise.resolve(1),
2: Promise.resolve(2)
}
console.log(res)
}).catch(onerror)
// 遍历
co(function* () {
var values = [n1, n2, n3]
yield values.map(somethingAsync)
})
function* somethingAsync(x) {
// do something async
return y
}实例:处理 Stream
Node 提供 Stream 模式读写数据,一次只处理数据的一部分,数据分成一块块依次处理,就好像“数据流”一样。 Stream 模式使用 EventEmitter API,会释放三个事件
data:下一块数据准备好 end:数据处理完成 error:发生错误
js
const co = require('co')
const fs = require('fs')
const stream = fs.createReadStream('./les_miserables.txt')
let valjeanCount = 0
co(function* () {
while (true) {
// 将最先发生的事件抛出
const res = yield Promise.race([
new Promise((resolve) => stream.once('data', resolve)),
new Promise((resolve) => stream.once('end', resolve)),
new Promise((resolve, reject) => stream.once('error', reject))
])
if (!res) {
break
}
stream.removeAllListeners('data')
stream.removeAllListeners('end')
stream.removeAllListeners('error')
valjeanCount += (res.toString().match(/valjean/gi) || []).length
}
console.log('count:', valjeanCount) // count: 1120
})