美文网首页
Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

作者: rollingstarky | 来源:发表于2023-01-11 20:38 被阅读0次
Distributing tasks to a set of consumers

将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumersfanout distributionventilator
与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。

A messaging pipeline

ZeroMQ Fanout/Fanin 模式

分布式 hashsum 破解器

需要以下组件实现一个标准的并行管线:

  • 一个协调节点负责在多个工作节点间分发任务
  • 多个工作节点承担具体的计算任务
  • 一个用于收集计算结果的节点
The architecture of a typical pipeline with ZeroMQ

即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。

实现 producer

为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 ab 两个字母的话,长度为 3 的字符串组合共有图示的以下几种:

Indexed n-ary tree for alphabet (a, b)

indexed-string-variation 包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
generateTasks.js:

export function* generateTasks(searchHash, alphabet,
    maxWordLength, batchSize) {
    let nVariations = 0
    for (let n = 1; n <= maxWordLength; n++) {
        nVariations += Math.pow(alphabet.length, n)
    }

    console.log('Finding the hashsum source string over ' +
        `${nVariations} possible variations`)

    let batchStart = 1
    while (batchStart <= nVariations) {
        const batchEnd = Math.min(
            batchStart + batchSize - 1, nVariations)
        yield {
            searchHash,
            alphabet: alphabet,
            batchStart,
            batchEnd
        }

        batchStart = batchEnd + 1
    }
}

producer.js:

import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
    const ventilator = new zmq.Push()
    await ventilator.bind('tcp://*:5016')
    await delay(1000)

    const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await ventilator.send(JSON.stringify(task))
    }
}

main().catch(err => console.log(err))
  • 创建一个 PUSH socket 并绑定给本地的 5016 端口,工作节点的 PULL socket 会连接到此端口并接收任务
  • 将每一个生成的任务字符串化,通过 PUSH socket 的 send() 方法发送给工作节点。工作节点以轮询的方式接收不同的任务
实现 worker

process Task.js:

import isv from 'indexed-string-variation'
import { createHash } from 'crypto'

export function processTask(task) {
    const variationGen = isv.generator(task.alphabet)
    console.log('processing from ' +
        `${variationGen(task.batchStart)} (${task.batchStart})` +
        `to ${variationGen(task.batchEnd)} (${task.batchEnd}`)

    for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
        const word = variationGen(idx)
        const shasum = createHash('sha1')
        shasum.update(word)
        const digest = shasum.digest('hex')

        if (digest === task.searchHash) {
            return word
        }
    }
}

processTask() 遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task 对象中的 searchHash 比较。

worker.js:

import zmq from 'zeromq'
import { processTask } from './processTask.js'

async function main() {
    const fromVentilator = new zmq.Pull()
    const toSink = new zmq.Push()

    fromVentilator.connect('tcp://localhost:5016')
    toSink.connect('tcp://localhost:5017')

    for await (const rawMessage of fromVentilator) {
        const found = processTask(JSON.parse(rawMessage.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await toSink.send(`Found: $found`)
        }
    }
}

main().catch(err => console.error(err))

worker.js 创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。

实现 results collector

collector.js:

import zmq from 'zeromq'

async function main() {
    const sink = new zmq.Pull()
    await sink.bind('tcp://*:5017')

    for await (const rawMessage of sink) {
        console.log('Message from worker: ', rawMessage.toString())
    }
}

main().catch(err => console.error(err))

运行以下命令测试结果:

node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

AMQP 实现 pipeline 和 competing consumers

Task distribution architecture using a message queue broker

像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。

hashsum 破解器的 AMQP 实现

producer-amqp.js:

import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000

const [, , maxLength, searchHash] = process.argv

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createConfirmChannel()
    await channel.assertQueue('tasks_queue')

    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
    }

    await channel.waitForConfirms()
    channel.close()
    connection.close()
}

main().catch(err => console.error(err))
  • 此处创建的是一个 confirmChannel,它提供了一个 waitForConfirms() 函数,可以在 broker 确认收到消息前等待,确保应用不会过早地关闭到 broker 的连接
  • channel.sendToQueue() 负责将一条消息直接发送给某个 queue,跳过任何 exchange 或者路由

worker-amqp.js:

import amqp from 'amqplib'
import { processTask } from './processTask.js'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('tasks_queue')
    channel.consume(queue, async (rawMessage) => {
        const found = processTask(
            JSON.parse(rawMessage.content.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await channel.sendToQueue('results_queue',
                Buffer.from(`Found: ${found}`))
        }
        await channel.ack(rawMessage)
    })
}

main().catch(err => console.error(err))

collector-amqp.js:

import amqp from 'amqplib'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const { queue } = await channel.assertQueue('results_queue')
    channel.consume(queue, msg => {
        console.log(`Message from worker: ${msg.content.toString()}`)
    })
}

main().catch(err => console.error(err))

运行如下命令测试效果:

node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

通过 Redis Streams 实现任务分发

Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。

A Redis Stream consumer group

Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。

producer-redis.js:

import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()

const [, , maxLength, searchHash] = process.argv

async function main() {
    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await redisClient.xadd('tasks_stream', '*',
            'task', JSON.stringify(task))
    }

    redisClient.disconnect()
}

main().catch(err => console.error(err))

worker-redis.js:

import Redis from 'ioredis'
import { processTask } from './processTask.js'

const redisClient = new Redis()
const [, , consumerName] = process.argv

async function main() {
    await redisClient.xgroup('CREATE', 'tasks_stream',
        'workers_group', '$', 'MKSTREAM')
        .catch(() => console.log('Consumer group already exists'))

    const [[, records]] = await redisClient.xreadgroup(
        'GROUP', 'workers_group', consumerName, 'STREAMS',
        'tasks_stream', '0')
    for (const [recordId, [, rawTask]] of records) {
        await processAndAck(recordId, rawTask)
    }

    while (true) {
        const [[, records]] = await redisClient.xreadgroup(
            'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
            'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
        for (const [recordId, [, rawTask]] of records) {
            await processAndAck(recordId, rawTask)
        }
    }
}

async function processAndAck(recordId, rawTask) {
    const found = processTask(JSON.parse(rawTask))
    if (found) {
        console.log(`Found! => ${found}`)
        await redisClient.xadd('results_stream', '*', 'result',
            `Found: ${found}`)
    }

    await redisClient.xack('tasks_stream', 'workers_group', recordId)
}

main().catch(err => console.error(err))
  • xgroup 命令用来确保 consumer group 存在。
    • CREATE 表示我们希望创建一个 consumer group
    • tasks_stream 表示我们想要读取的 stream 的名字
    • workers_group 是 consumer group 的名字
    • 第四个参数表示 consumer group 开始读取的记录的位置。$ 表示当前 stream 中最后一条记录的 ID
    • MKSTREAM 表示如果 stream 不存在则创建它
  • 通过 xreadgroup 命令读取属于当前 consumer 的所有 pending 的记录。
    • 'GROUP''workers_group'consumerName 用来指代 consumer group 和 consumer 的名字
    • STREAMStasks_stream 用来指代我们想要读取的 stream 的名字
    • 0 用来表示我们想要开始读取的记录的位置。这里表示从属于当前 consumer 的第一条记录开始,读取所有 pending 的消息
  • 通过另外一条 xreadgroup 命令读取 stream 里新增加的记录。
    • 'BLOCK''0' 两个参数表示如果没有新的消息,就一直阻塞等待。'0' 具体表示一直等待永不超时
    • 'COUNT''1' 表示一次请求只获取一条记录
    • 特殊 ID > 表示只获取还没有被当前的 consumer group 处理过的消息
  • processAndAck() 函数负责当 xreadgroup() 返回的记录被处理完成时,调用 xack 命令进行确认,将该记录从当前 consumer 的 pending 列表里移除

collector-redis.js:

import Redis from 'ioredis'

const redisClient = new Redis()

async function main() {
    let lastRecordId = '$'
    while (true) {
        const data = await redisClient.xread(
            'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
        for (const [, logs] of data) {
            for (const [recordId, [, message]] of logs) {
                console.log(`Message from worker: ${message}`)
                lastRecordId = recordId
            }
        }
    }
}

main().catch(err => console.error(err))

运行程序测试效果:

node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

参考资料

Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

相关文章

网友评论

      本文标题:Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

      本文链接:https://www.haomeiwen.com/subject/njbbcdtx.html