美文网首页
Chapter2:Asynchronous Control Fl

Chapter2:Asynchronous Control Fl

作者: 宫若石 | 来源:发表于2017-11-05 19:41 被阅读0次

    从一个同步式编程风格迁移到Node平台(在Node架构中,持续传递风格和异步接口是常用方式),将是一件令人受挫的事情。编写异步代码可能是一种与众不同的体验,特别是设计到控制流的时候。一些简单的问题,比如说:对一系列文件进行迭代、执行连续的一系列任务、或者等待一系列执行被完成,需要开发者采取新的方法或者技术来避免写出低效或者可读性差的代码。一种常见的问题是陷入回调地域问题的陷阱,并且看到代码规模横向发展而不是纵向,伴随着嵌套,使得即使是简单的日常工作也变得难以维护和阅读。

    在这篇文章中,我们可以看到它实际上可能驯服回调,并且能够通过设计模式和一些原则写出干净、可控制的异步代码。我们将看到如何控制流库,如异步,可以明显简化我们的问题,同时我们也能发现持续传递方式并不是唯一的方式来实现异步接口。事实上,我们将学会Promises和ECMAScript6中的generators如何成为有力而灵活的替代方案。对于每一个范例而言,我们将学习有助于帮助我们实现最常见控制流的模式,并且在最后一章,我们将准备有足够的自信去编写干净并且高效的异步代码的。

The difficulties of asynchronous programming

    在JavaScript中,异步代码失去控制,毫无疑问是十分容易的。闭包和就地定义匿名函数的方式允许一种平滑过渡的编程经验,而不需要开发者跳转到代码库中的其他点。这完全吻合原理:它使代码简洁,保持代码的流畅性,并且能在更短的时间内工作。不幸的是,牺牲代码质量,比如说:模块化、可重用性、可维护性,迟早会导致回调嵌套的失控扩散,函数规模的臃肿,并且导致糟糕的代码组织。大多数时间下,创建闭包并不是函数功能的需要。所以它更像原则问题,而不是一个与异步编程相关的问题。意识到我们的代码正在变得更丑陋或者更好,更进一步直到如何让代码不丑陋并采用合适的方式是区分新手和专家的标准。

Creating a simple web spider

    为了解释这个问题,我们将创造一个小小的网络爬虫,一个命令行应用程序,以URL作为输入,并将其内容本地下载到一个文件当中。在本章提供的代码中,我们将使用一些npm依赖:

•request:一个库来精简HTTP调用。

•mkdirp:一个小的实用应用程序来创建目录递归。

    此外,我们将经常引用一个名为./utilities的本地模块,其中包括一些我们将在应用程序中使用的辅助程序。我们为了简洁忽略文件中的内容,但是你可以找到完整的实现,伴随一个package.json包括完整的依赖列表,本书的包下载提供在:http://www.packtpub.com.。

    我们的应用中的核心函数被包括在一个名为spider.js的模块内部。我们来一起看看这段代码是什么样的。首先,我们先要下载我们将用到的所有依赖:

var request = require('request');

var fs = require('fs');

var mkdirp = require('mkdirp');

var path = require('path');

var utilities = require('./utilities');

    接下来我将创造一个新的函数被称为spider(),当下载过程完成时,将通过URL来下载并且回调callback函数将被调用。

function spider(url, callback) {

    var filename = utilities.urlToFilename(url);

    fs.exists(filename, function(exists) {     //[1]

        if(!exists) {

            console.log("Downloading " + url);

            request(url, function(err, response, body) {     //[2]

                if(err) {

                   callback(err);

               } else {

                   mkdirp(path.dirname(filename), function(err) {     //[3]

                       if(err) {

                          callback(err);

                       } else {

                           fs.writeFile(filename, body, function(err) {     //[4]

                                if(err) {

                                    callback(err);

                                } else {

                                    callback(null, filename, true);

                               }

                         });

                     }

                 });

             }

          });

        } else {

             callback(null, filename, false);

        }

    });

}

前述代码将会执行如下任务:

1.通过验证相应的文件是否已经完成创建,检查URL是否已经完成下载:

fs.exists(filecodename, function(exists) …

2.如果找不到文件,使用以下行代码来下载URL:

request(url, function(err, response, body) …

3.然后,我们将确定包含该文件的目录是否存在:

mkdirp(path.dirname(filename), function(err) …

4.最后我们将把HTTP响应的主体写入文件系统:

fs.writeFile(filename, body, function(err) …

    为了完善我们的爬虫应用程序,我们只需要通过提供一个URL作为输入,来调用spider()(在我们的例子中,我们从命令行参数中读取)。

spider(process.argv[2], function(err, filename, downloaded) {

    if(err) {

        console.log(err);

    } else if(downloaded){

        console.log('Completed the download of "'+ filename +'"');

    } else {

        console.log('"'+ filename +'" was already downloaded');

    }

});

    现在,首先我们准备去试试我们的网络爬虫程序,首先,请确保你有utilities.js模块,并且package.json包含你整个工程目录下的全部依赖。接下来通过运行一下代码来安装所有的依赖,代码如下:

npm install

    接下,来我们将执行spider模块来下载网页的内容,伴随着如下命令行:

node spider http://www.example.com

    我们的web爬虫应用程序需要我们总是包括协议头(比如:http://),在我们所提供的URL当中。另外不要期望HTML链接会被重写或者图片资源会被下载,因为这只是一个简单的例子来演示异步编程如何工作。

Generators

    ES6中的Generator的引入,极大程度上改变了JavaScript程序员对迭代器的看法,并为解决callback hell提供了新方法。

    迭代器模式是很常用的设计模式,但是实现起来,很多东西是程序化的;当迭代规则比较复杂时,维护迭代器内的状态,是比较麻烦的。 于是有了generator,何为generator?

Generators: a better way to build Iterators.

借助 yield 关键字,可以更优雅的实现fibonacci数列:

function* fibonacci() {

    let a = 0, b = 1;

    while(true) {

        yield a; [a, b] = [b, a + b];

} }

yield与异步

    yield可以暂停运行流程,那么便为改变执行流程提供了可能。这和Python的coroutine类似。

    Geneartor之所以可用来控制代码流程,就是通过yield来将两个或者多个Geneartor的执行路径互相切换。这种切换是语句级别的,而不是函数调用级别的。其本质是CPS变换。

    yield之后,实际上本次调用就结束了,控制权实际上已经转到了外部调用了generator的next方法的函数,调用的过程中伴随着状态的改变。那么如果外部函数不继续调用next方法,那么yield所在函数就相当于停在yield那里了。所以把异步的东西做完,要函数继续执行,只要在合适的地方再次调用generator 的next就行,就好像函数在暂停后,继续执行。

Buffer

    在Node.js中,Buffer类是随Node内核一起发布的核心库。Buffer库为Node.js带来了一种存储原始数据的方法,可以让Nodejs处理二进制数据,每当需要在Nodejs中处理I/O操作中移动的数据时,就有可能使用Buffer库。原始数据存储在 Buffer 类的实例中。一个 Buffer 类似于一个整数数组,但它对应于 V8 堆内存之外的一块原始内存。

    Buffer 和 Javascript 字符串对象之间的转换需要显式地调用编码方法来完成。以下是几种不同的字符串编码:

     ‘ascii’ – 仅用于 7 位 ASCII 字符。这种编码方法非常快,并且会丢弃高位数据。

     ‘utf8’ – 多字节编码的 Unicode 字符。许多网页和其他文件格式使用 UTF-8。

     ‘ucs2’ – 两个字节,以小尾字节序(little-endian)编码的 Unicode 字符。它只能对 BMP(基本多文种平面,U+0000 – U+FFFF) 范围内的字符编码。

    ‘base64’ – Base64 字符串编码。

    ‘binary’ – 一种将原始二进制数据转换成字符串的编码方式,仅使用每个字符的前 8 位。这种编码方法已经过时,应当尽可能地使用 Buffer 对象。

    'hex' - 每个字节都采用 2 进制编码。

    在Buffer中创建一个数组,需要注意以下规则:

    Buffer 是内存拷贝,而不是内存共享。

    Buffer 占用内存被解释为一个数组,而不是字节数组。比如,new Uint32Array(new Buffer([1,2,3,4])) 创建了4个 Uint32Array,它的成员为 [1,2,3,4] ,而不是[0x1020304] 或 [0x4030201]。

slab 分配

    在 lib/buffer.js 模块中,有个模块私有变量 pool, 它指向当前的一个8K 的slab :

Buffer.poolSize = 8 * 1024;

var pool;

function allocPool() {

    pool = new SlowBuffer(Buffer.poolSize);

    pool.used = 0;

}

SlowBuffer 为 src/node_buffer.cc 导出,当用户调用new Buffer时 ,如果你要申请的空间大于8K,node 会直接调用SlowBuffer ,如果小于8K ,新的Buffer 会建立在当前slab 之上:

    新创建的Buffer的 parent成员变量会指向这个slab ,

    offset 变量指向在这个slab 中的偏移:

if (!pool || pool.length - pool.used < this.length) allocPool();

this.parent = pool;

this.offset = pool.used;

pool.used += this.length;

在 lib/_tls_legacy.js 中,SlabBuffer创建了一个 10MB 的 slab:

function alignPool() {

    // Ensure aligned slices

   if (poolOffset & 0x7) {

       poolOffset |= 0x7; poolOffset++;

   }

}

    这里做了8字节的内存对齐处理。

    如果不按照平台要求对数据存放进行对齐,会带来存取效率上的损失。比如32位的Intel处理器通过总线访问(包括读和写)内存数据。每个总线周期从偶地址开始访问32位内存数据,内存数据以字节为单位存放。如果一个32位的数据没有存放在4字节整除的内存地址处,那么处理器就需要2个总线周期对其进行访问,显然访问效率下降很多。

    Node.js 是一个跨平台的语言,第三方的C++ addon 也是非常多,避免破坏了第三方模块的使用,比如 directIO 就必须要内存对齐。

浅拷贝

    Buffer更像是可以做指针操作的C语言数组。例如,可以用[index]方式直接修改某个位置的字节。 需要注意的是:Buffer#slice 方法, 不是返回一个新的Buffer, 而是返回对原 Buffer 某个区间数值的引用。

const buf1 = Buffer.allocUnsafe(26);

for (var i = 0 ; i < 26 ; i++) {

    buf1[i] = i + 97; // 97 is ASCII a

}

const buf2 = buf1.slice(0, 3);

buf2.toString('ascii', 0, buf2.length);

    // Returns: 'abc'

buf1[0] = 33;

buf2.toString('ascii', 0, buf2.length);

    // Returns : '!bc'

官方 API 提供的例子,buf2是对buf1前3个字节的引用,对buf2的修改就相当于作用在buf1上。

深拷贝

    如果想要拷贝一份Buffer,得首先创建一个新的Buffer,并通过.copy方法把原Buffer中的数据复制过去。

const buf1 = Buffer.allocUnsafe(26);

const buf2 = Buffer.allocUnsafe(26).fill('!');

for (let i = 0 ; i < 26 ; i++) {

    buf1[i] = i + 97; // 97 is ASCII a

}

buf1.copy(buf2, 8, 16, 20);

console.log(buf2.toString('ascii', 0, 25));

    // Prints: !!!!!!!!qrst!!!!!!!!!!!!!

    通过深拷贝的方式,buf2截取了buf1的部分内容,之后对buf2的修改并不会作用于buf1, 两者内容独立不共享。需要注意的事:深拷贝是一种消耗 CPU 和内存的操作,需要非常谨慎。

内存碎片

    动态分配将不可避免会产生内存碎片的问题,那么什么是内存碎片? 内存碎片即“碎片的内存”描述一个系统中所有不可用的空闲内存,这些碎片之所以不能被使用,是因为负责动态分配内存的分配算法使得这些空闲的内存无法使用。

    上述的 slab 分配,存在明显的内存碎片,即 8KB 的内存并没有完全被使用,存在一定的浪费。通用的slab实现,会浪费约1/2的空间。

    当然存在更高效,更省内存的内存管理分配,比如 tcmalloc, 但也必须承受一定的管理代价。node.js 在这方面并没有一味的执着于此,而是达到一种性能与空间使用的平衡。

PM2源码浅析

    近年来,大前端和全栈的思潮下,很多公司的项目转成了node驱动,pm2做为一个带有负载均衡功能的进程管理器,是众多公司的主流方案。

PM2工作原理:

    要理解pm2就要理解god和santan的关系:god的职责是守护进程,重启进程。santan的职责是异常进程的退出,杀死进程,毁灭进程等工作。

架构图

god和santan通讯的方式,是RPC:

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

执行流程图

总结

    pm2的集群,从原理是采用cluster.fork来实现的,深入理解cluser模块,精度pm2的源代码,能更好的理解pm2,更好的理解node设计思想

ES7+ES8新趋势与异步处理:

ES7新特性:

Array.prototype.includes()方法:

    includes()的作用,是查找一个值在不在数组里,若在,则返回true,反之返回false。 基本用法:

['a', 'b', 'c'].includes('a')     // true

['a', 'b', 'c'].includes('d')     // false

Array.prototype.includes()方法接收两个参数:要搜索的值和搜索的开始索引。当第二个参数被传入时,该方法会从索引处开始往后搜索(默认索引值为0)。若搜索值在数组中存在则返回true,否则返回false。 且看下面示例:

['a', 'b', 'c', 'd'].includes('b')         // true

['a', 'b', 'c', 'd'].includes('b', 1)      // true

['a', 'b', 'c', 'd'].includes('b', 2)      // false

那么,我们会联想到ES6里数组的另一个方法indexOf,下面的示例代码是等效的:

['a', 'b', 'c'].includes('a')          //true

['a', 'b', 'c'].indexOf('a') > -1      //true

*异步函数(Async functions)

为什么要引入async?

    众所周知,JavaScript语言的执行环境是“单线程”的,那么异步编程对JavaScript语言来说就显得尤为重要。以前我们大多数的做法是使用回调函数来实现JavaScript语言的异步编程。回调函数本身没有问题,但如果出现多个回调函数嵌套,例如:进入某个页面,需要先登录,拿到用户信息之后,调取用户商品信息,代码如下:

this.$http.jsonp('/login', (res) => {

      this.$http.jsonp('/getInfo', (info) => {

          // do something

      })

})

    假如上面还有更多的请求操作,就会出现多重嵌套。代码很快就会乱成一团,这种情况就被称为“回调函数地狱”(callback hell)。

    于是,我们提出了Promise,它将回调函数的嵌套,改成了链式调用。写法如下:

var promise = new Promise((resolve, reject) => {  

    this.login(resolve)

})

.then(() => this.getInfo())

.catch(() => { console.log("Error") })

    从上面可以看出,Promise的写法只是回调函数的改进,使用then方法,只是让异步任务的两段执行更清楚而已。Promise的最大问题是代码冗余,请求任务多时,一堆的then,也使得原来的语义变得很不清楚。此时我们引入了另外一种异步编程的机制:Generator。

    Generator 函数是一个普通函数,但是有两个特征。一是,function关键字与函数名之间有一个星号;二是,函数体内部使用yield表达式,定义不同的内部状态(yield在英语里的意思就是“产出”)。一个简单的例子用来说明它的用法:

function* helloWorldGenerator() {

    yield 'hello';

    yield 'world';  

    return 'ending';

}

var hw = helloWorldGenerator();

    上面代码定义了一个 Generator 函数helloWorldGenerator,它内部有两个yield表达式(hello和world),即该函数有三个状态:hello,world 和 return 语句(结束执行)。Generator 函数的调用方法与普通函数一样,也是在函数名后面加上一对圆括号。不同的是,调用 Generator 函数后,该函数并不执行,返回的也不是函数运行结果,而是一个指向内部状态的指针对象,必须调用遍历器对象的next方法,使得指针移向下一个状态。也就是说,每次调用next方法,内部指针就从函数头部或上一次停下来的地方开始执行,直到遇到下一个yield表达式(或return语句)为止。换言之,Generator 函数是分段执行的,yield表达式是暂停执行的标记,而next方法可以恢复执行。上述代码分步执行如下:

hw.next()

// { value: 'hello', done: false }

hw.next()

// { value: 'world', done: false }

hw.next()

// { value: 'ending', done: true }

hw.next()

// { value: undefined, done: true }

    Generator函数的机制更符合我们理解的异步编程思想。

    用户登录的例子,我们用Generator来写,如下:

var gen = function* () {  

    const f1 = yield this.login()

    const f2 = yield this.getInfo()

};

    虽然Generator将异步操作表示得很简洁,但是流程管理却不方便(即何时执行第一阶段、何时执行第二阶段)。此时,我们便希望能出现一种能自动执行Generator函数的方法。我们的主角来了:async/await。

ES8引入了async函数,使得异步操作变得更加方便。简单说来,它就是Generator函数的语法糖。

async function asyncFunc(params) {  

    const result1 = await this.login()  

    const result2 = await this.getInfo()

}

更加简洁易懂

变体,异步函数存在以下四种使用形式:

    函数声明:asyncfunctionfoo(){}

    函数表达式:constfoo=asyncfunction(){}

    对象的方式:letobj={asyncfoo(){}}

    箭头函数:constfoo=async()=>{}

常见用法汇总:

处理单个异步结果:

async function asyncFunc() {

    const result = await otherAsyncFunc();

     console.log(result);

}

顺序处理多个异步结果:

async function asyncFunc() {  

    const result1 = await otherAsyncFunc1();  

    console.log(result1);  

    const result2 = await otherAsyncFunc2();

    console.log(result2);

}

并行处理多个异步结果:

async function asyncFunc() {  

    const [result1, result2] = await Promise.all([  

        otherAsyncFunc1(),    otherAsyncFunc2()

    ]);  

   console.log(result1, result2);

}

处理错误:

async function asyncFunc() {  

    try {    

       await otherAsyncFunc();  

   } catch (err) {

   console.error(err);  

  }

}

网络 (Net) 

网络模型

    ISO制定的OSI参考模型的过于庞大、复杂招致了许多批评。与此对照,由技术人员自己开发的TCP/ IP协议栈获得了更为广泛的应用。如图所示,是TCP/IP参考模型和OSI参考模型的对比示意图。

UDP vs TCP

    TCP(Transmission Control Protocol):传输控制协议

    UDP(User Datagram Protocol):用户数据报协议

    主要在于连接性(Connectivity)、可靠性(Reliability)、有序性(Ordering)、有界性(Boundary)、拥塞控制(Congestion or Flow control)、传输速度(Speed)、量级(Heavy/Light weight)、头部大小(Header size)等差异。

主要差异:

1.TCP是面向连接(Connection oriented)的协议,UDP是无连接(Connection less)协议;

    TCP用三次握手建立连接:1) Client向server发送SYN;2) Server接收到SYN,回复Client一个SYN-ACK;3)Client接收到SYN_ACK,回复Server一个ACK。到此,连接建成。

    UDP发送数据前不需要建立连接。

2.TCP可靠,UDP不可靠;

    TCP丢包会自动重传,UDP不会。

3.TCP有序,UDP无序;

    消息在传输过程中可能会乱序,后发送的消息可能会先到达,TCP会对其进行重排序,UDP不会。从程序实现的角度来看,可以用下图来进行描述。

    从上图也能清晰的看出,TCP通信需要服务器端侦听listen、接收客户端连接请求accept,等待客户端connect建立连接后才能进行数据包的收发(recv/send)工作。而UDP则服务器和客户端的概念不明显,服务器端即接收端需要绑定端口,等待客户端的数据的到来。后续便可以进行数据的收发(recvfrom/sendto)工作。

Socket 抽象

    Socket 是对 TCP/IP 协议族的一种封装,是应用层与TCP/IP协议族通信的中间软件抽象层。它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。

    Socket 还可以认为是一种网络间不同计算机上的进程通信的一种方法,利用三元组(ip地址,协议,端口)就可以唯一标识网络中的进程,网络中的进程通信可以利用这个标志与其它进程进行交互。

    Socket 起源于 Unix ,Unix/Linux 基本哲学之一就是“一切皆文件”,都可以用“打开(open) –> 读写(write/read) –> 关闭(close)”模式来进行操作。因此 Socket 也被处理为一种特殊的文件。

TCP Socket

    Node.js 的 Net模块也对 TCP socket 进行了抽象封装:

function Socket(options) {

    if (!(this instanceof Socket)) return new Socket(options);

    this._connecting = false;

    this._hadError = false;

    this._handle = null;

    this._parent = null;

    this._host = null;

    if (typeof options === 'number')

        options = { fd: options }; // Legacy interface.

    else if (options === undefined)

        options = {};

    stream.Duplex.call(this, options);

    if (options.handle) {

        this._handle = options.handle; // private

    } else if (options.fd !== undefined) {

        this._handle = createHandle(options.fd);

        this._handle.open(options.fd);

        if ((options.fd == 1 || options.fd == 2) &&

           (this._handle instanceof Pipe) &&

           process.platform === 'win32') {

           // Make stdout and stderr blocking on Windows

          var err = this._handle.setBlocking(true);

          if (err)

              throw errnoException(err, 'setBlocking');

          }

         this.readable = options.readable !== false;

         this.writable = options.writable !== false;

       } else {

          // these will be set once there is a connection

          this.readable = this.writable = false;

      }

     // shut down the socket when we're finished with it.

     this.on('finish', onSocketFinish);

     this.on('_socketEnd', onSocketEnd);

     initSocketHandle(this);

     // ...

}

util.inherits(Socket, stream.Duplex);

    首先Socket是一个全双工的 Stream,所以继承了 Duplex。通过createHandle创建套接字并赋值到this._handle上。同时监听finish,_socketEnd事件:

粘包

一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。

情况分析

    TCP粘包通常在流传输中出现,UDP则不会出现粘包,因为UDP有消息边界,发送数据段需要等待缓冲区满了才将数据发送出去,当满的时候有可能不是一条消息而是几条消息合并在换中去内,在成粘包;另外接收数据端没能及时接收缓冲区的包,造成了缓冲区多包合并接收,也是粘包。

解决办法

    自定义应用层协议;

    不使用Nagle算法, 使用提供的 API:socket.setNoDelay。

UDP

UDP Socket:

function Socket(type, listener) {

    EventEmitter.call(this);

    if (typeof type === 'object') {

        var options = type;

        type = options.type;

    }

    var handle = newHandle(type);

    handle.owner = this;

    this._handle = handle;

    this._receiving = false;

    this._bindState = BIND_STATE_UNBOUND;

    this.type = type; this.fd = null; // compatibility hack

    // If true - UV_UDP_REUSEADDR flag will be set

    this._reuseAddr = options && options.reuseAddr;

    if (typeof listener === 'function')

        this.on('message', listener);

}

util.inherits(Socket, EventEmitter);

    UDP 继承了EventEmitter, 同样也支持 IPV4和 IPV6协议, 由type区分,this._reuseAddr标识是否要使用选项:SO_REUSEADDR。

    SO_REUSEADDR允许完全重复的捆绑:当一个IP地址和端口绑定到某个套接口上时,还允许此IP地址和端口捆绑到另一个套接口上。一般来说,这个特性仅在支持多播的系统上才有,而且只对UDP套接口而言(TCP不支持多播)。

总结

    尽量不要尝试去使用UDP,除非知道丢包了对于应用是没有影响的,否则排查网络丢包会很困难。

相关文章

网友评论

      本文标题:Chapter2:Asynchronous Control Fl

      本文链接:https://www.haomeiwen.com/subject/ghshmxtx.html