许多人想知道单线程的Node.js如何与多线程后端竞争。因此,考虑到Node既有的单线程特性,那么多的大公司选择Node作为它们的后端似乎是违反直觉的。要知道为什么,我们必须理解Node单线程的真正含义。
当初创建JavaScript的目的只是为了在web上做一些简单的事情,比如验证表单,或者创建一个彩虹色的鼠标轨迹。直到2009年,Ryan Dahl才创建了Node,使开发人员能够使用该语言编写后端代码。
后端语言通常支持多线程,有各种机制可以在线程之间同步数据,以及支持线程相关的其他特性。要在JavaScript中支持这些东西,就需要改变整个语言,而这并不是Dahl真正的目标。为了让普通JavaScript支持多线程,他必须创建一个变通方案。让我们了解一下……
Node.js到底是如何工作的
Node.js 使用两种线程:一个主线程由event loop处理,其他辅助线程由worker pool处理。
事件循环是一种获取回调(函数)并将其注册以备将来执行的机制。它与平常的JavaScript代码在相同的线程中运行。当JavaScript操作阻塞线程时,事件循环也被阻塞。
Worker pool 是一种执行模型,它生成并处理单独的线程,然后这些线程同步执行任务并将结果返回给事件循环。然后,事件循环使用上述结果执行提供的回调。
简而言之,它负责异步I/O操作——主要是与系统的磁盘和网络的交互。它主要用于 fs
(I/O密集型)或crypto
(CPU密集型)等模块。Worker pool 是在libuv中实现的,每当Node需要在JavaScript和C++之间进行内部通信时,它都会导致轻微的延迟,但这并不明显。
有了这两种机制,我们可以这样写代码:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
if (err) {
return null;
}
console.log(content.toString());
});
前面提到的fs模块告诉worker pool使用它的一个线程来读取文件的内容,并在完成时通知事件循环。然后,事件循环接受提供的回调函数并使用文件的内容执行它。
以上是一个非阻塞代码的例子。因此,我们不必同步等待某件事发生。我们告诉worker pool读取文件并使用执行结果调用提供的函数。由于worker pool有自己的线程,所以在读取文件时,事件循环可以继续正常执行。
在需要同步执行某些复杂的操作之前,一切都是正常的:任何花费太长时间运行的函数都会阻塞线程。如果一个应用程序有很多这样的功能,它可能会显著降低服务器的吞吐量,或者完全卡死。在这种情况下,无法将工作分配给worker pool。
需要复杂计算的领域——如AI、机器学习或大数据——实际上不能有效地使用Node.js,因为这些操作阻塞了仅有的一个线程(主线程),使服务器无响应。这种情况一直持续到Node.js v10.5.0出现,它增加了对多线程的支持。
worker_threads
worker_threads
模块是一个包,它允许我们创建全功能的多线程Node.js应用程序。
线程worker是在单独的线程中生成的一段代码(通常从文件中获取)。
注意,术语thread worker、worker和thread经常交替使用;它们都指的是同一件事。
要开始使用线程worker,我们必须导入 worker_threads
模块。我们先创建一个函数来帮助生成这些线程worker,然后再讨论它们的属性。
type WorkerCallback = (err: any, result?: any) => any;
export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
const worker = new Worker(path, { workerData });
worker.on('message', cb.bind(null, null));
worker.on('error', cb);
worker.on('exit', (exitCode) => {
if (exitCode === 0) {
return null;
}
return cb(new Error(`Worker has stopped with code ${exitCode}`));
});
return worker;
}
要创建worker,我们必须创建Worker
类的一个实例。在第一个参数中,我们提供了包含worker代码的文件的路径;第二个参数,我们提供了一个对象,其中包含一个名为workerData
的属性。这是我们希望线程在开始运行时能够访问的数据。
注意,无论你使用的是JavaScript本身,还是可转换为JavaScript的语言(比如TypeScript),路径都应该是指向带有 .js
或.mjs
扩展名的文件。
我还想指出为什么我们使用回调方法,而不是返回一个在message
事件触发时被解决的promise。这是因为worker可以发送多个 message
事件,而不是一个。
正如你在上面的示例中所看到的,线程之间的通信是基于事件的,这就是说我们可以设置事件监听器,以便在worker触发指定事件时调用它。
以下是最常见的事件:
worker.on('error', (error) => {});
当worker中出现未捕获的异常时,就会发出error
事件。然后终止worker,错误作为回调函数的第一个参数传递。
worker.on('exit', (exitCode) => {});
当worker 退出时,会发送exit
事件。如果在worker内部调用了process.exit()
,则会向回调函数提供状态码exitCode
。如果使用worker.terminate()
终止worker ,状态码为1。
worker.on('online', () => {});
在worker 停止解析JavaScript代码并开始执行时发送online
事件。它不常用,但可以在特定的情况下提供有效信息。
worker.on('message', (data) => {});
worker 向父线程发送数据时会发送message
事件。
现在,我们来看看如何在线程之间共享数据。
线程之间交换数据
要将数据发送到另一个线程,我们使用port.postMessage()
方法。函数签名如下:
port.postMessage(data[, transferList])
端口对象可以是parentPort
,也可以是 MessagePort
的一个实例。稍后再详细介绍。
data 参数
第一个参数 data
是一个复制到另一个线程的对象。它可以包含复制算法支持的任何内容。
数据由结构化克隆算法复制。据Mozilla:
它通过递归遍历输入对象来构建克隆,同时维护以前访问过的引用的映射,以避免无限遍历循环。
该算法不复制函数、错误对象、属性描述符或原型链。还应该注意,以这种方式复制对象与JSON不同,因为它可以包含循环引用和类型化数组,而JSON不能。
通过支持类型化数组的复制,该算法使得在线程之间共享内存成为可能。
线程间共享内存
人们可能会说,cluster
或child_process
之类的模块在很久以前就启用了线程。对,也不对。
cluster
模块可以创建多个node实例,由一个主进程在它们之间路由分发收到的请求。集群应用程序有效地成倍增加服务器吞吐量;但是,我们不能使用 cluster
模块派生一个单独的线程。
人们倾向于使用PM2这样的工具管理集群应用程序,而不是在代码中手动处理。但是如果你有兴趣,你可以看下我的这篇关于如何使用cluster
模块的帖子。
child_process
模块可以生成任何可执行文件,不管它是不是JavaScript。它非常类似,但是它缺少worker_threads
所具有的几个重要特性。
具体来说,线程worker更轻量级,并且与父线程共享相同的进程ID。它们还可以与父线程共享内存,这使它们可以避免序列化大的数据负载,从而更有效地来回发送数据。
现在让我们看一个如何在线程之间共享内存的示例。为了共享内存,必须将ArrayBuffer
或SharedArrayBuffer
的实例作为数据参数或置于数据参数内部发送给另一个线程。
这是一个与父线程共享内存的worker :
import { parentPort } from 'worker_threads';
parentPort.on('message', () => {
const numberOfElements = 100;
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
const arr = new Int32Array(sharedBuffer);
for (let i = 0; i < numberOfElements; i += 1) {
arr[i] = Math.round(Math.random() * 30);
}
parentPort.postMessage({ arr });
});
首先,我们创建一个SharedArrayBuffer
,其中的内存需要包含100个32位整数。接下来,我们创建一个Int32Array
的实例,它将使用缓冲区来保存它的结构,然后我们用一些随机数填充数组并将其发送给父线程。
父线程:
import path from 'path';
import { runWorker } from '../run-worker';
const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
if (err) {
return null;
}
arr[0] = 5;
});
worker.postMessage({});
通过将'arr[0]
更改为5
,我们实际上在两个线程中都修改了它。
当然,通过共享内存,我们可能面临一个风险:在一个线程中更改一个值,另一个线程中也随之改变了。但是我们同时也获得了一个非常好的特性:不需要序列化值就可以在另一个线程中使用,这极大地提高了效率。只需记住正确地管理对数据的引用,以便在完成数据处理后对其进行垃圾收集。
共享一个整数数组是可以的,但我们真正感兴趣的是共享对象——存储信息的默认方式。不幸的是,没有SharedObjectBuffer
或类似的东西,但我们可以自己创建一个类似的结构。
transferList 参数
transferList
只能包含ArrayBuffer
和MessagePort
。一旦它们被转移到另一个线程,就不能再在发送线程中使用:内存被移动到另一个线程,因此在发送线程中不可用。
目前,我们还不能通过将它们包含在transferList
中来传输网络套接字(这个可以通过child_process
模块来实现)。
为通信创建通道
线程之间的通信通过端口进行,端口是MessagePort
类的实例,支持基于事件的通信。
使用端口在线程之间进行通信有两种方法。第一个是默认的,也是两个中比较简单的一个。在worker的代码中,我们从worker_threads
模块导入一个名为parentPort
的对象,并使用该对象的.postMessage()
方法向父线程发送消息。
这里有一个例子:
import { parentPort } from 'worker_threads';
const data = {
// ...
};
parentPort.postMessage(data);
parentPort
是Node.js在后台为我们创建的MessagePort
的一个实例,它支持与父线程的通信。这样,我们可以通过使用parentPort
和 worker
对象在线程之间进行通信。
线程之间通信的第二种方式是实际创建一个自己的MessageChannel
并将它发送给worker。下面演示了如何创建一个新的MessagePort
,并与worker共享:
import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker(path.join(__dirname, 'worker.js'));
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => {
console.log('message from worker:', message);
});
worker.postMessage({ port: port2 }, [port2]);
创建port1
和port2
之后,我们在port1
上设置事件监听器,并将port2
发送给wroker。我们必须把它包括在transferList
中,以便转移到worker一方。
现在,在worker内部:
import { parentPort, MessagePort } from 'worker_threads';
parentPort.on('message', (data) => {
const { port }: { port: MessagePort } = data;
port.postMessage('heres your message!');
});
通过这种方式,我们使用父线程发送的端口。
使用parentPort
不一定是一个错误的方法,但更好的方法是创建一个新的MessagePort
,其中包含一个MessageChannel
实例,然后与派生的worker共享它(即:关注点分离)。
请注意,在下面的示例中,我使用了 parentPort
来简化。
使用worker的两种方法
我们有两种使用worker的方法。第一种方法是生成一个worker,执行它的代码,然后将结果发送给父线程。使用这种方法,每次出现新任务时,我们都必须重新创建一个worker。
第二种方法是派生一个worker并为message
事件设置侦听器。每次触发message
时,它都会执行工作并将结果发送回父线程,父线程将保持worker的活动状态,以供以后使用。
Node.js 文档推荐使用第二种方法,因为实际创建一个线程worker需要花费很多精力,这需要创建一个虚拟机并解析和执行代码。这种方法也比不断创建worker更有效。
这种方法称为线程池,因为我们创建一个线程池并让他们等待,在需要时发送message
事件来完成工作。
下面是一个文件的例子,其中包含了一个worker的创建、执行和关闭的过程:
import { parentPort } from 'worker_threads';
const collection = [];
for (let i = 0; i < 10; i += 1) {
collection[i] = i;
}
parentPort.postMessage(collection);
将collection
发送到父线程后,它就退出了。
这里有一个worker的例子,它可以等待很长一段时间才得到任务:
import { parentPort } from 'worker_threads';
parentPort.on('message', (data: any) => {
const result = doSomething(data);
parentPort.postMessage(result);
});
worker_threads 模块提供的有用属性
在worker_threads
模块中有一些可用的属性:
isMainThread
当不在worker线程内操作时,该属性为true
。如果需要,可以在worker文件的开头包含一个简单的if
语句,以确保它只是作为worker运行。
import { isMainThread } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
workerData
包含在生成的线程worker构造函数中的数据。
const worker = new Worker(path, { workerData });
在worker线程里:
import { workerData } from 'worker_threads';
console.log(workerData.property);
parentPort
前面提到的用于与父线程通信的MessagePort
实例。
threadId
分配给worker的唯一标识符。
现在我们知道了技术细节,让我们实现一些东西并在实践中测试我们的知识。
实现 setTimeout
setTimeout
是一个无限循环,顾名思义,它会让应用程序超时。实际上,它会在每次迭代中检查起始时间和给定毫秒数的总和是否小于当前时间。
import { parentPort, workerData } from 'worker_threads';
const time = Date.now();
while (true) {
if (time + workerData.time <= Date.now()) {
parentPort.postMessage({});
break;
}
}
这个特定的实现生成一个线程,执行它的代码,然后在完成之后退出。
让我们尝试实现使用这个worker的代码。首先,让我们创建一个状态,在这个状态中,我们将跟踪生成的worker:
const timeoutState: { [key: string]: Worker } = { };
接下来是负责创建worker并将其保存到状态中的函数:
export function setTimeout(callback: (err: any) => any, time: number) {
const id = uuidv4();
const worker = runWorker(
path.join(__dirname, './timeout-worker.js'),
(err) => {
if (!timeoutState[id]) {
return null;
}
timeoutState[id] = null;
if (err) {
return callback(err);
}
callback(null);
},
{
time,
},
);
timeoutState[id] = worker;
return id;
}
首先,我们使用UUID包为worker创建一个惟一的标识符,然后使用前面定义的帮助函数runWorker
来获取worker。我们还向worker传递一个回调函数,该函数在worker发送一些数据时触发。最后,我们将worker保存到状态并返回id
。
在回调函数中,我们必须检查worker是否仍然存在于状态中,因为可能存在cancelTimeout()
,会把它删除。如果它确实存在,我们将它从状态中删除,并调用传递给setTimeout
函数的callback
。
cancelTimeout
函数使用.terminate()
方法强迫worker退出,并将worker从状态中移除:
export function cancelTimeout(id: string) {
if (timeoutState[id]) {
timeoutState[id].terminate();
timeoutState[id] = undefined;
return true;
}
return false;
}
我创建了一小段测试代码,用于检查这种方法与原生方法有多大差别。你可以查看这里的代码。结果如下:
native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }
我们可以看到在setTimeout
中有一个轻微的延迟(大约40ms),这是由正在创建的worker造成的。平均CPU成本也稍微高一些,但是还可以接受(CPU成本是整个进程期间CPU使用量的平均值)。
如果我们可以重用worker,我们将降低延迟和CPU使用率,这就是为什么我们现在要研究如何实现我们自己的线程池。
实现一个线程池
如前所述,一个线程池是一个给定数量的先前创建的worker线程,他们等待并监听message
事件。一旦触发了message
事件,它们就执行工作并发回结果。
为了更好地说明我们将要做什么,下面是如何创建一个8个worker线程的线程池:
const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
如果你熟悉限制并发操作,那么你将看到这里的逻辑几乎是相同的,只是不同的应用场景。
如上面的代码片段所示,我们向WorkerPool
的构造函数传递worker的路径和要创建的worker的数量。
export class WorkerPool<T, N> {
private queue: QueueItem<T, N>[] = [];
private workersById: { [key: number]: Worker } = {};
private activeWorkersById: { [key: number]: boolean } = {};
public constructor(public workerPath: string, public numberOfThreads: number) {
this.init();
}
}
在这里,我们有额外的属性,比如workersById
和activeWorkersById
,在这些属性中,我们可以分别保存现有worker和当前运行worker的id。还有queue
,我们可以用下面的结构保存对象:
type QueueCallback<N> = (err: any, result?: N) => void;
interface QueueItem<T, N> {
callback: QueueCallback<N>;
getData: () => T;
}
callback
只是默认的node回调,它的第一个参数是error,第二个参数是可能的结果。getData
是传递给线程池的.run()
方法的函数(下面将对此进行解释),该方法在项目开始处理时调用。getData
函数返回的数据将被传递给worker线程。
在.init()
方法中,我们创建worker并将他们保存在状态中:
private init() {
if (this.numberOfThreads < 1) {
return null;
}
for (let i = 0; i < this.numberOfThreads; i += 1) {
const worker = new Worker(this.workerPath);
this.workersById[i] = worker;
this.activeWorkersById[i] = false;
}
}
为了避免无限循环,我们首先确保线程的数量是>1。然后,我们创建有效数量的worker,并通过它们在workersById
状态下的索引保存它们。我们保存它们当前是否运行在 activeWorkersById
状态的信息,这个状态在默认情况下总是为false。
现在,我们必须实现前面提到的.run()
方法来设置一个任务,以便在worker可用时运行。
public run(getData: () => T) {
return new Promise<N>((resolve, reject) => {
const availableWorkerId = this.getInactiveWorkerId();
const queueItem: QueueItem<T, N> = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
},
};
if (availableWorkerId === -1) {
this.queue.push(queueItem);
return null;
}
this.runWorker(availableWorkerId, queueItem);
});
}
在传递给promise的函数内部,我们首先检查是否有一个worker可以通过调用.getInactiveWorkerId()
来处理数据:
private getInactiveWorkerId(): number {
for (let i = 0; i < this.numberOfThreads; i += 1) {
if (!this.activeWorkersById[i]) {
return i;
}
}
return -1;
}
接下来,我们创建了一个queueItem
,其中保存传递给 .run()
方法的getData
函数和回调函数。在回调中,我们要么resolve
要么reject
该promise,这取决于worker是否将错误传递给回调。
如果 availableWorkerId
是-1,那么没有可用的worker,我们将queueItem
添加到queue
。如果有一个可用的worker,我们调用 . runworker()
方法来执行这个worker。
在.runWorker()
方法中,我们必须在activeWorkersById
状态中设置当前正在使用的worker;为 message
和error
事件设置事件监听器(并在随后清除);最后,将数据发送给worker。
private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
const worker = this.workersById[workerId];
this.activeWorkersById[workerId] = true;
const messageCallback = (result: N) => {
queueItem.callback(null, result);
cleanUp();
};
const errorCallback = (error: any) => {
queueItem.callback(error);
cleanUp();
};
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');
this.activeWorkersById[workerId] = false;
if (!this.queue.length) {
return null;
}
this.runWorker(workerId, this.queue.shift());
};
worker.once('message', messageCallback);
worker.once('error', errorCallback);
worker.postMessage(await queueItem.getData());
}
首先,通过使用传递的workerId
,我们从workersById
状态获得worker引用。然后,在activeWorkersById
内部,我们将[workerId]
属性设置为true,这样我们就知道在worker忙碌时不运行任何其他东西。
接下来,我们分别在message
和error
事件上创建messageCallback
和errorCallback
,然后注册这些函数来监听事件并将数据发送给工作者。
在回调函数中,我们调用queueItem
的回调,然后调用cleanUp
函数。在cleanUp
函数中,我们确保删除事件监听器,因为我们多次重用同一个worker。如果我们不删除监听器,就会出现内存泄漏;实际上,我们会慢慢地耗尽内存。
在activeWorkersById
状态中,我们将[workerId]
属性设置为false
,并检查队列是否为空。如果不是,则从队列中删除第一个项目,并使用另一个queueItem
再次调用worker。
让我们创建一个worker,在接收到message
事件的数据后执行一些计算:
import { isMainThread, parentPort } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data: any) => {
const collection = [];
for (let i = 0; i < 1000000; i += 1) {
collection[i] = Math.round(Math.random() * 100000);
}
return collection.sort((a, b) => {
if (a > b) {
return 1;
}
return -1;
});
};
parentPort.on('message', (data: any) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
worker创建了一个由100万个随机数组成的数组,然后对它们进行排序。只要花点时间完成,会发生什么事无关紧要。
下面是一个简单的使用线程池的例子:
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);
const items = [...new Array(100)].fill(null);
Promise.all(
items.map(async (_, i) => {
await pool.run(() => ({ i }));
console.log('finished', i);
}),
).then(() => {
console.log('finished all');
});
我们首先创建一个包含8个worker的线程池。然后我们创建一个包含100个元素的数组,对于每个元素,我们在工作池中运行一个任务。首先,8个任务将被立即执行,其余的将被放入队列并逐步执行。通过使用一个线程池,我们不必每次都创建一个worker,这极大地提高了效率。
总结
worker_threads
为应用程序提供了一种非常简单的方法支持多线程。通过将繁重的CPU计算任务委托给其他线程,我们可以显著提高服务器的吞吐量。有了官方的线程支持,我们可以期待更多来自AI、机器学习和大数据等领域的开发人员和工程师开始使用Node.js。
交流
欢迎关注微信公众号“1024译站”,同步国际最新互联网技术资讯。
![](https://img.haomeiwen.com/i1618526/840b5a6dfebbe564.png)
网友评论