美文网首页
Node.js 设计模式笔记 —— 由 Promises 和 A

Node.js 设计模式笔记 —— 由 Promises 和 A

作者: rollingstarky | 来源:发表于2022-11-12 21:13 被阅读0次

    回调函数(Callbacks)是 Node.js 中异步编程的底层构件,但它们远远达不到对用户友好的程度。对于实现代码中最常见的串行控制流,一个未经训练的开发者很容易陷入到 callback hell 问题中。即便实现是正确的,该串行控制流也会显得不必要的复杂和脆弱。

    为了获得更好的异步编程体验,第一个出现的就是 promise,一种保存了异步操作的状态和最终结果的对象。Promise 可以轻易地被串联起来,实现串行控制流,可以像其他任何对象一样自由地转移。Pormise 大大简化了异步代码,后来在此基础上又有了 asyncawait,能够令异步代码看起来就像是同步代码一样。

    Promises

    Promises 是 ECMAScript 2015 标准(ES6)的一部分,为传递异步结果提供了一种健壮的解决方案,替代原本的 CPS 样式的回调函数。Promise 能够令所有主要的异步控制流更加易读、简洁和健壮。

    Promise 是一种用来代表异步操作的最终结果(或错误)的对象。在专业术语中,当异步操作未完成时,我们称 Promise 是 pending 的;当异步操作成功结束时,Promise 是 fulfilled 的;当异步操作因为错误终止时,Promise 是 rejected 的;当 Promise 或者是 fulfilled 或者是 rejected,则将其认定为 settled

    Promise 对象的 then() 方法可以获取成功执行后的结果或者终止时报出的错误:

    promise.then(onFulfilled, onRejected)
    

    其中 onFulfilled 是一个回调函数,最终会接收到 Promise 成功时的值;onRejected是另一个回调函数,最终会接收 Promise 异常终止时的值(如果有的话)。

    基于回调函数的如下代码:

    asyncOperation(arg, (err, result) => {
      if (err) {
        // handle the error
      }
      // do stuff with the result
    })
    

    Promise 实现上述同样的功能,则更加优雅、结构化:

    asyncOperationPromise(arg)
      .then(result => {
        // do stuff with result
      }, err => {
        // handle the error
      })
    

    asyncOperationPromise() 会返回一个 Promise,可以被用来获取最终结果的值或者失败的原因。但最为关键的属性是,then() 方法会同步地返回另一个 Promise。
    更进一步地,如果 onFulfilled 或者 onRejected 函数返回一个值 x,那么 then() 方法返回的 Promise 会有以下行为:

    • x 是一个值,则 then() 返回的 Promise 使用 x 作为自身完成时的值
    • x 是一个 Promise 且成功完成,则 x 完成时返回的值作为 then() 返回的 Promise 完成时的值
    • x 是一个 Promise 且因为错误终止,则 x 终止的原因作为 then() 返回的 Promise 终止的原因

    上述行为能够令我们将多个 promise 连接成链,轻松地将异步操作聚合在一起。如果我们没有指定一个 onFulfilled 或者 onRejected handler,Promise 完成时的值或者终止时的原因都会自动地传递给链条中的下一个 Promise。通过 Promise 链,任务的执行顺序突然变得很简单。

    asyncOperationPromise(arg)
      .then(result1 => {
        // return another promise
        return asyncOperationPromise(arg2)
      })
      .then(result2 => {
        // return a value
        return 'done'
      })
      .then(undefined, err => {
        // any error in the chain is caught here
      })
    

    promise API

    Promise 构造函数(new Promise((resolve, reject) => {}))会创建一个新的 Promise 实例,其完成还是终止取决于作为参数传入的函数的行为。
    作为参数传入的函数接收如下两个参数:

    • resolve(obj):resolve 是一个函数,在调用时为 Promise 提供完成时的值。当 obj 是值时,则 obj 本身作为 Promise 完成时的值;当 obj 是另一个 Promise 时,则 obj 完成时的值作为当前 Promise 完成时的值
    • reject(err):Promise 因为 err 终止
    function delay(milliseconds) {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          resolve(new Date())
        }, milliseconds)
      })
    }
    
    console.log(`${new Date().getSeconds()}s\nDelaying...`)
    delay(1000)
      .then(newDate => {
        console.log(`${newDate.getSeconds()}s`)
      })
    

    Promise 最重要的静态方法:

    • Promise.resolve(obj):从另一个 Promise、thenable 对象或者值创建一个新的 Promise
    • Promise.reject(err):创建一个 Promise,该 Promise 会因为 err 终止
    • Promise.all(iterable):从一个可迭代对象创建 Promise,若该 iterable 中的每一项都提供了一个 fulfill 值,则 Promise 最终以包含这些值的列表作为 fulfill 值;若其中有任意一项 reject,则 Promise.all() 返回的 Promise 以第一个 reject 的 err 终止
    • Promise.allSettled(iterable):此方法会等待所有输入的 Promise 或者 fulfill 或者 reject,之后返回一个包含所有 fulfill 值和 reject 原因的列表
    • Promise.race(iterable):返回可迭代对象中第一个 fulfill 或 reject 的 Promise

    Promise 关键的实例方法:

    • promise.catch(onRejected):实际上就是 promise.then(undefined, onRejected) 的语法糖
    • promise.finally(onFinally):允许我们设置一个 onFinally 回调函数,在 promise fulfill 或者 reject 时调用

    顺序执行

    顺序执行意味着,每次只执行一系列任务中的一个,完成后再依次执行后面的任务。这一系列任务的先后顺序必须是预先定义好的,因为一个任务的结果有可能影响后续任务的执行。

    An example of sequential execution flow with three tasks

    上述执行流程有着不同形式的变种:

    • 顺序执行一系列已知的任务,不需要在它们之间传递数据
    • 前一个任务的输出作为后一个任务的输入(chainpipelinewaterfall
    • 迭代任务集合,同时在每个元素上一个接一个地运行异步任务

    package.json

    {
      "name": "03-promises-web-spider-v2",
      "version": "1.0.0",
      "private": true,
      "type": "module",
      "scripts": {
        "test": "echo \"Error: no test specified\" && exit 1"
      },
      "dependencies": {
        "cheerio": "^1.0.0-rc.3",
        "mkdirp": "^0.5.1",
        "superagent": "^5.2.2",
        "slug": "^1.1.0"
      },
      "engines": {
        "node": ">=14"
      },
      "engineStrict": true
    }
    

    spider.js

    import {promises as fsPromises} from 'fs'
    import {dirname} from 'path'
    import superagent from 'superagent'
    import mkdirp from 'mkdirp'
    import {urlToFilename, getPageLinks} from './utils.js'
    import {promisify} from 'util'
    
    const mkdirpPromises = promisify(mkdirp)
    
    function download(url, filename) {
      console.log(`Downloading ${url}`)
      let content
      return superagent.get(url)
        .then((res) => {
          content = res.text
          return mkdirpPromises(dirname(filename))
        })
        .then(() => fsPromises.writeFile(filename, content))
        .then(() => {
          console.log(`Downloaded and saved: ${url}`)
          return content
        })
    }
    
    function spiderLinks(currentUrl, content, nesting) {
      let promise = Promise.resolve()
      if (nesting === 0) {
        return promise
      }
      const links = getPageLinks(currentUrl, content)
      for (const link of links) {
        promise = promise.then(() => spider(link, nesting - 1))
      }
    
      return promise
    }
    
    export function spider(url, nesting) {
      const filename = urlToFilename(url)
      return fsPromises.readFile(filename, 'utf8')
        .catch((err) => {
          if (err.code !== 'ENOENT') {
            throw err
          }
    
          // The file doesn't exist, so let’s download it
          return download(url, filename)
        })
        .then(content => spiderLinks(url, content, nesting))
    }
    

    spider-cli.js

    import {spider} from './spider.js'
    
    const url = process.argv[2]
    const nesting = Number.parseInt(process.argv[3], 10) || 1
    
    spider(url, nesting)
      .then(() => console.log('Download complete'))
      .catch(err => console.error(err))
    

    utils.js

    import {join, extname} from 'path'
    import {URL} from 'url'
    import slug from 'slug'
    import cheerio from 'cheerio'
    
    function getLinkUrl(currentUrl, element) {
      const parsedLink = new URL(element.attribs.href || '', currentUrl)
      const currentParsedUrl = new URL(currentUrl)
      if (parsedLink.hostname !== currentParsedUrl.hostname ||
        !parsedLink.pathname) {
        return null
      }
      return parsedLink.toString()
    }
    
    export function urlToFilename(url) {
      const parsedUrl = new URL(url)
      const urlPath = parsedUrl.pathname.split('/')
        .filter(function (component) {
          return component !== ''
        })
        .map(function (component) {
          return slug(component, {remove: null})
        })
        .join('/')
      let filename = join(parsedUrl.hostname, urlPath)
      if (!extname(filename).match(/htm/)) {
        filename += '.html'
      }
    
      return filename
    }
    
    export function getPageLinks(currentUrl, body) {
      return Array.from(cheerio.load(body)('a'))
        .map(function (element) {
          return getLinkUrl(currentUrl, element)
        })
        .filter(Boolean)
    }
    

    node spider-cli.js http://www.baidu.com 2

    其中的 spiderLinks() 函数通过循环动态地构建了一条 Promise 链:

    • 先定义一个“空的” Promise 对象(resovle 到 undefined),这个空 Promise 只是作为链条的起点
    • 在循环中,不断将 promise 变量更新为新的 Promise 对象(通过调用上一个 Promise 的 then() 方法得到)。这就是 Promise 的异步遍历模式

    for 循环的最后,promise 变量会是最后一个 then() 方法返回的 Promise,因而只有当链条中的所有 Promise 都 resolve 时,promise 才会 resolve。

    纵观所有代码,我们可以不需要像使用 callback 那样,强制地包含众多错误传递逻辑。因而大大减少了代码量和出错的机会。

    并行执行

    在某些情况下,一系列异步任务的执行顺序并不重要,我们需要的只是当所有的任务都完成后能收到通知。

    An example of parallel execution with three tasks

    虽然 Node.js 是单线程的,但得益于其 non-blocking nature,我们仍可以实现并发行为。

    An example of how asynchronous tasks run in parallel

    比如我们有一个 Main 函数需要执行两个异步任务:

    • Main 函数首先触发异步任务 Task1 和 Task2 的执行。异步任务触发后,会将程序控制权立即交还给 Main 函数,再转交给 event loop
    • 当 Task1 中的异步任务结束时,event loop 调用 Task1 的回调函数,将控制权交给 Task1。Task1 执行完成自身内部的同步指令,通知 Main 函数并返还控制权
    • 当 Task2 中的异步任务结束时,event loop 调用 Task2 的回调函数,将控制权交给 Task2。在 Task2 的终点,Main 函数再次被通知。Main 函数得知 Task1 和 Task2 全部结束,继续执行或者返回结果

    简单来说,在 Node.js 中,我们只能并发地执行异步操作,因为它们的并发行为是由内部的非阻塞 API 控制的。同步(阻塞)操作无法并发地执行,除非它们的执行与异步操作交织在一起,或者由 setTimeout()setImmediate() 包裹。

    Promise 实现并发执行流,可以借助内置的 Promise.all() 方法。该方法会返回一个新的 Promise,只有当所有传入的 Promise 都 fulfill 时,新 Promise 才会 fulfill。如果传入的 Promise 之间没有因果关系,这些 Promise 就会并发地执行。

    对于前面的 spider 应用,只需要将 spiderLinks() 函数改为如下形式:

    function spiderLinks(currentUrl, content, nesting) {
      if (nesting === 0) {
        return Promise.resolve()
      }
      const links = getPageLinks(currentUrl, content)
      const promises = links.map(link => spider(link, nesting - 1))
      return Promise.all(promises)
    }
    

    Async/await

    Promise 链相对于 callback hell 来说肯定是要好太多的,但是我们仍然需要调用 then() 方法,以及为链条中的每一个任务创建新的函数,对于日常编程中非常普遍的控制流来说还是比较麻烦。而 Async/await 可以帮助我们写出像同步代码一样可读性强、容易理解的异步代码。
    Async 函数是一种特殊的函数,在函数体里面可以使用 await 表达式“暂停”任意一个 Promise 的执行,将控制权交还给 async 函数的调用者,等该 Promise revolve 后再返回到暂停的地方继续执行。

    function delay(milliseconds) {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          resolve(new Date())
        }, milliseconds)
      })
    }
    
    async function playingWithDelays() {
      console.log('Initial date: ', new Date())
      const dateAfterOneSecond = await delay(1000)
      console.log('Date after one second: ', dateAfterOneSecond)
    
      const dateAfterThreeSeconds = await delay(3000)
      console.log('Date after 3 secnods: ', dateAfterThreeSeconds)
      return 'done'
    }
    
    playingWithDelays()
      .then(result => {
        console.log(`After 4 seconds: ${result}`)
      })
    

    错误处理

    Async/await 的另一个巨大的优势在于,它能够标准化 try...catch 代码块的行为,不管是针对同步代码中的 throw,抑或是异步代码中的 Promise reject。

    function delayError(milliseconds) {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          reject(new Error(`Error after ${milliseconds}ms`))
        })
      })
    }
    
    async function playingWithErrors(throwSyncError) {
      try {
        if (throwSyncError) {
          throw new Error('This is a synchronous error')
        }
        await delayError(1000)
      } catch (err) {
        console.log(`We have an error: ${err.message}`)
      } finally {
        console.log('Done')
      }
    }
    
    // playingWithErrors(true)
    playingWithErrors(false)
    

    串行执行

    借助 Async/await,可以对之前的 spider 应用实现很多优化。比如 download() 函数:

    async function download(url, filename) {
      console.log(`Downloading ${url}`)
      const {text: content} = await superagent.get(url)
      await mkdirpPromises(dirname(filename))
      await fsPromises.writeFile(filename, content)
      console.log(`Downloaded and saved: ${url}`)
      return content
    }
    

    整段代码行数大大减少,看起来也很“平整”,没有任何层级和缩进。

    接下来是 spiderLinks() 函数,使用 async/await 异步地遍历一个列表:

    async function spiderLinks(currentUrl, content, nesting) {
      if (nesting === 0) {
        return
      }
      const links = getPageLinks(currentUrl, content)
      for (const link of links) {
        await spider(link, nesting - 1)
      }
    }
    

    然后是 spider() 函数,如何简单地通过 try...catch 处理错误,令异步代码更加易读:

    export async function spider(url, nesting) {
      const filename = urlToFilename(url)
      let content
      try {
        content = await fsPromises.readFile(filename, 'utf8')
      } catch (err) {
        if (err.code !== 'ENOENT') {
          throw err
        }
        content = await download(url, filename)
      }
      return spiderLinks(url, content, nesting)
    }
    

    并行执行

    使用纯 async/await 实现并行的异步执行流程,可以参考如下代码:

    async function spiderLinks(currentUrl, content, nesting) {
      if (nesting === 0) {
        return
      }
      const links = getPageLinks(currentUrl, content)
      const promises = links.map(link => spider(link, nesting - 1))
      for (const promise of promises) {
        await promise
      }
    }
    

    然而上述代码存在一定的问题。如果列表中有一个 Promise reject 了,我们不得不等待列表中其他所有的 Promise 都 resolve,spiderLinks() 函数返回的 Promise 才会 reject。这种行为在多数情况下都是不理想的。
    我们通常都会想要在操作发生错误的第一时间捕获错误信息。因而并行执行异步操作,最后仍建议使用下面形式的代码:

    async function spiderLinks(currentUrl, content, nesting) {
      if (nesting === 0) {
        return
      }
      const links = getPageLinks(currentUrl, content)
      const promises = links.map(link => spider(link, nesting - 1))
      return Promise.all(promises)
    }
    

    参考资料

    Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

    相关文章

      网友评论

          本文标题:Node.js 设计模式笔记 —— 由 Promises 和 A

          本文链接:https://www.haomeiwen.com/subject/ynznnrtx.html