美文网首页
Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

作者: rollingstarky | 来源:发表于2023-01-11 20:38 被阅读0次
    Distributing tasks to a set of consumers

    将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumersfanout distributionventilator
    与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
    此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。

    A messaging pipeline

    ZeroMQ Fanout/Fanin 模式

    分布式 hashsum 破解器

    需要以下组件实现一个标准的并行管线:

    • 一个协调节点负责在多个工作节点间分发任务
    • 多个工作节点承担具体的计算任务
    • 一个用于收集计算结果的节点
    The architecture of a typical pipeline with ZeroMQ

    即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。

    实现 producer

    为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 ab 两个字母的话,长度为 3 的字符串组合共有图示的以下几种:

    Indexed n-ary tree for alphabet (a, b)

    indexed-string-variation 包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
    generateTasks.js:

    export function* generateTasks(searchHash, alphabet,
        maxWordLength, batchSize) {
        let nVariations = 0
        for (let n = 1; n <= maxWordLength; n++) {
            nVariations += Math.pow(alphabet.length, n)
        }
    
        console.log('Finding the hashsum source string over ' +
            `${nVariations} possible variations`)
    
        let batchStart = 1
        while (batchStart <= nVariations) {
            const batchEnd = Math.min(
                batchStart + batchSize - 1, nVariations)
            yield {
                searchHash,
                alphabet: alphabet,
                batchStart,
                batchEnd
            }
    
            batchStart = batchEnd + 1
        }
    }
    

    producer.js:

    import zmq from 'zeromq'
    import delay from 'delay'
    import { generateTasks } from './generateTasks.js'
    
    const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
    const BATCH_SIZE = 10000
    
    const [, , maxLength, searchHash] = process.argv
    
    async function main() {
        const ventilator = new zmq.Push()
        await ventilator.bind('tcp://*:5016')
        await delay(1000)
    
        const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
        for (const task of generatorObj) {
            await ventilator.send(JSON.stringify(task))
        }
    }
    
    main().catch(err => console.log(err))
    
    • 创建一个 PUSH socket 并绑定给本地的 5016 端口,工作节点的 PULL socket 会连接到此端口并接收任务
    • 将每一个生成的任务字符串化,通过 PUSH socket 的 send() 方法发送给工作节点。工作节点以轮询的方式接收不同的任务
    实现 worker

    process Task.js:

    import isv from 'indexed-string-variation'
    import { createHash } from 'crypto'
    
    export function processTask(task) {
        const variationGen = isv.generator(task.alphabet)
        console.log('processing from ' +
            `${variationGen(task.batchStart)} (${task.batchStart})` +
            `to ${variationGen(task.batchEnd)} (${task.batchEnd}`)
    
        for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
            const word = variationGen(idx)
            const shasum = createHash('sha1')
            shasum.update(word)
            const digest = shasum.digest('hex')
    
            if (digest === task.searchHash) {
                return word
            }
        }
    }
    

    processTask() 遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task 对象中的 searchHash 比较。

    worker.js:

    import zmq from 'zeromq'
    import { processTask } from './processTask.js'
    
    async function main() {
        const fromVentilator = new zmq.Pull()
        const toSink = new zmq.Push()
    
        fromVentilator.connect('tcp://localhost:5016')
        toSink.connect('tcp://localhost:5017')
    
        for await (const rawMessage of fromVentilator) {
            const found = processTask(JSON.parse(rawMessage.toString()))
            if (found) {
                console.log(`Found! => ${found}`)
                await toSink.send(`Found: $found`)
            }
        }
    }
    
    main().catch(err => console.error(err))
    

    worker.js 创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。

    实现 results collector

    collector.js:

    import zmq from 'zeromq'
    
    async function main() {
        const sink = new zmq.Pull()
        await sink.bind('tcp://*:5017')
    
        for await (const rawMessage of sink) {
            console.log('Message from worker: ', rawMessage.toString())
        }
    }
    
    main().catch(err => console.error(err))
    

    运行以下命令测试结果:

    node worker.js
    node worker.js
    node collector.js
    node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
    

    AMQP 实现 pipeline 和 competing consumers

    Task distribution architecture using a message queue broker

    像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
    可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。

    hashsum 破解器的 AMQP 实现

    producer-amqp.js:

    import amqp from 'amqplib'
    import { generateTasks } from './generateTasks.js'
    
    const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
    const BATCH_SIZE = 10000
    
    const [, , maxLength, searchHash] = process.argv
    
    async function main() {
        const connection = await amqp.connect('amqp://localhost')
        const channel = await connection.createConfirmChannel()
        await channel.assertQueue('tasks_queue')
    
        const generatorObj = generateTasks(searchHash, ALPHABET,
            maxLength, BATCH_SIZE)
        for (const task of generatorObj) {
            channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
        }
    
        await channel.waitForConfirms()
        channel.close()
        connection.close()
    }
    
    main().catch(err => console.error(err))
    
    • 此处创建的是一个 confirmChannel,它提供了一个 waitForConfirms() 函数,可以在 broker 确认收到消息前等待,确保应用不会过早地关闭到 broker 的连接
    • channel.sendToQueue() 负责将一条消息直接发送给某个 queue,跳过任何 exchange 或者路由

    worker-amqp.js:

    import amqp from 'amqplib'
    import { processTask } from './processTask.js'
    
    async function main() {
        const connection = await amqp.connect('amqp://localhost')
        const channel = await connection.createChannel()
        const { queue } = await channel.assertQueue('tasks_queue')
        channel.consume(queue, async (rawMessage) => {
            const found = processTask(
                JSON.parse(rawMessage.content.toString()))
            if (found) {
                console.log(`Found! => ${found}`)
                await channel.sendToQueue('results_queue',
                    Buffer.from(`Found: ${found}`))
            }
            await channel.ack(rawMessage)
        })
    }
    
    main().catch(err => console.error(err))
    

    collector-amqp.js:

    import amqp from 'amqplib'
    
    async function main() {
        const connection = await amqp.connect('amqp://localhost')
        const channel = await connection.createChannel()
        const { queue } = await channel.assertQueue('results_queue')
        channel.consume(queue, msg => {
            console.log(`Message from worker: ${msg.content.toString()}`)
        })
    }
    
    main().catch(err => console.error(err))
    

    运行如下命令测试效果:

    node worker-amqp.js
    node worker-amqp.js
    node collector-amqp.js
    node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
    

    通过 Redis Streams 实现任务分发

    Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
    每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。

    A Redis Stream consumer group

    Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。

    producer-redis.js:

    import Redis from 'ioredis'
    import { generateTasks } from './generateTasks.js'
    
    const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
    const BATCH_SIZE = 10000
    const redisClient = new Redis()
    
    const [, , maxLength, searchHash] = process.argv
    
    async function main() {
        const generatorObj = generateTasks(searchHash, ALPHABET,
            maxLength, BATCH_SIZE)
        for (const task of generatorObj) {
            await redisClient.xadd('tasks_stream', '*',
                'task', JSON.stringify(task))
        }
    
        redisClient.disconnect()
    }
    
    main().catch(err => console.error(err))
    

    worker-redis.js:

    import Redis from 'ioredis'
    import { processTask } from './processTask.js'
    
    const redisClient = new Redis()
    const [, , consumerName] = process.argv
    
    async function main() {
        await redisClient.xgroup('CREATE', 'tasks_stream',
            'workers_group', '$', 'MKSTREAM')
            .catch(() => console.log('Consumer group already exists'))
    
        const [[, records]] = await redisClient.xreadgroup(
            'GROUP', 'workers_group', consumerName, 'STREAMS',
            'tasks_stream', '0')
        for (const [recordId, [, rawTask]] of records) {
            await processAndAck(recordId, rawTask)
        }
    
        while (true) {
            const [[, records]] = await redisClient.xreadgroup(
                'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
                'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
            for (const [recordId, [, rawTask]] of records) {
                await processAndAck(recordId, rawTask)
            }
        }
    }
    
    async function processAndAck(recordId, rawTask) {
        const found = processTask(JSON.parse(rawTask))
        if (found) {
            console.log(`Found! => ${found}`)
            await redisClient.xadd('results_stream', '*', 'result',
                `Found: ${found}`)
        }
    
        await redisClient.xack('tasks_stream', 'workers_group', recordId)
    }
    
    main().catch(err => console.error(err))
    
    • xgroup 命令用来确保 consumer group 存在。
      • CREATE 表示我们希望创建一个 consumer group
      • tasks_stream 表示我们想要读取的 stream 的名字
      • workers_group 是 consumer group 的名字
      • 第四个参数表示 consumer group 开始读取的记录的位置。$ 表示当前 stream 中最后一条记录的 ID
      • MKSTREAM 表示如果 stream 不存在则创建它
    • 通过 xreadgroup 命令读取属于当前 consumer 的所有 pending 的记录。
      • 'GROUP''workers_group'consumerName 用来指代 consumer group 和 consumer 的名字
      • STREAMStasks_stream 用来指代我们想要读取的 stream 的名字
      • 0 用来表示我们想要开始读取的记录的位置。这里表示从属于当前 consumer 的第一条记录开始,读取所有 pending 的消息
    • 通过另外一条 xreadgroup 命令读取 stream 里新增加的记录。
      • 'BLOCK''0' 两个参数表示如果没有新的消息,就一直阻塞等待。'0' 具体表示一直等待永不超时
      • 'COUNT''1' 表示一次请求只获取一条记录
      • 特殊 ID > 表示只获取还没有被当前的 consumer group 处理过的消息
    • processAndAck() 函数负责当 xreadgroup() 返回的记录被处理完成时,调用 xack 命令进行确认,将该记录从当前 consumer 的 pending 列表里移除

    collector-redis.js:

    import Redis from 'ioredis'
    
    const redisClient = new Redis()
    
    async function main() {
        let lastRecordId = '$'
        while (true) {
            const data = await redisClient.xread(
                'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
            for (const [, logs] of data) {
                for (const [recordId, [, message]] of logs) {
                    console.log(`Message from worker: ${message}`)
                    lastRecordId = recordId
                }
            }
        }
    }
    
    main().catch(err => console.error(err))
    

    运行程序测试效果:

    node worker-redis.js workerA
    node worker-redis.js workerB
    node collector-redis.js
    node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
    

    参考资料

    Node.js Design Patterns: Design and implement production-grade Node.js applications using proven patterns and techniques, 3rd Edition

    相关文章

      网友评论

          本文标题:Node.js 设计模式笔记 —— 消息中间件及其应用模式(任务

          本文链接:https://www.haomeiwen.com/subject/njbbcdtx.html