将高成本的任务委派给多个工作节点,这种类型的应用并不适合由 Pub/Sub 模式实现。因为我们并不想同一个任务被多个消费者收到,相反我们更需要一种类似负载均衡的消息分发模式。在消息系统术语中,也被称为 competing consumers,fanout distribution 或 ventilator。
与 HTTP 负载均衡器不同的是,任务分发系统中的消费者是一种更活跃的角色。绝大多数时候都是消费者连接到任务队列,请求新的任务。这一点在可扩展系统中非常关键,允许我们在不修改生产者部分的情况下,直接平滑地增加工作节点的数量。
此外,在一个通用的消息系统中,我们没有必要强调生产者和消费者之间的请求/响应通信。多数情况下,更优先的选择是使用单向的异步通信,从而获得更优异的并行能力和扩展性。消息基本上总是沿着一个方向流动,这样的管道允许我们构建复杂的信息处理架构,又不必承受同步通信带来的开销。
ZeroMQ Fanout/Fanin 模式
分布式 hashsum 破解器
需要以下组件实现一个标准的并行管线:
- 一个协调节点负责在多个工作节点间分发任务
- 多个工作节点承担具体的计算任务
- 一个用于收集计算结果的节点
即一个节点负责生成所有可能的字符串组合,并将它们分发给不同的工作节点;工作节点则负责计算接收到的字符串,比较 hash 值;最后一个节点负责收集暴力破解的结果。
实现 producer
为了表示所有可能的字符组合,这里使用 N 维索引树。每个节点包含一个当前位置下可能出现的字母,比如只有 a
、b
两个字母的话,长度为 3 的字符串组合共有图示的以下几种:
indexed-string-variation
包可以帮助我们由索引计算出对应的字符串,这项工作可以在工作节点完成,因此 producer 这里只需要将分好组的索引值分发给工作节点。
generateTasks.js:
export function* generateTasks(searchHash, alphabet,
maxWordLength, batchSize) {
let nVariations = 0
for (let n = 1; n <= maxWordLength; n++) {
nVariations += Math.pow(alphabet.length, n)
}
console.log('Finding the hashsum source string over ' +
`${nVariations} possible variations`)
let batchStart = 1
while (batchStart <= nVariations) {
const batchEnd = Math.min(
batchStart + batchSize - 1, nVariations)
yield {
searchHash,
alphabet: alphabet,
batchStart,
batchEnd
}
batchStart = batchEnd + 1
}
}
producer.js:
import zmq from 'zeromq'
import delay from 'delay'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv
async function main() {
const ventilator = new zmq.Push()
await ventilator.bind('tcp://*:5016')
await delay(1000)
const generatorObj = generateTasks(searchHash, ALPHABET, maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await ventilator.send(JSON.stringify(task))
}
}
main().catch(err => console.log(err))
- 创建一个 PUSH socket 并绑定给本地的 5016 端口,工作节点的 PULL socket 会连接到此端口并接收任务
- 将每一个生成的任务字符串化,通过 PUSH socket 的
send()
方法发送给工作节点。工作节点以轮询的方式接收不同的任务
实现 worker
process Task.js:
import isv from 'indexed-string-variation'
import { createHash } from 'crypto'
export function processTask(task) {
const variationGen = isv.generator(task.alphabet)
console.log('processing from ' +
`${variationGen(task.batchStart)} (${task.batchStart})` +
`to ${variationGen(task.batchEnd)} (${task.batchEnd}`)
for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
const word = variationGen(idx)
const shasum = createHash('sha1')
shasum.update(word)
const digest = shasum.digest('hex')
if (digest === task.searchHash) {
return word
}
}
}
processTask()
遍历给定区间内的所有索引值,对每一个索引生成对应的字符串,再计算其 SHA1 值,与传入的 task
对象中的 searchHash
比较。
worker.js:
import zmq from 'zeromq'
import { processTask } from './processTask.js'
async function main() {
const fromVentilator = new zmq.Pull()
const toSink = new zmq.Push()
fromVentilator.connect('tcp://localhost:5016')
toSink.connect('tcp://localhost:5017')
for await (const rawMessage of fromVentilator) {
const found = processTask(JSON.parse(rawMessage.toString()))
if (found) {
console.log(`Found! => ${found}`)
await toSink.send(`Found: $found`)
}
}
}
main().catch(err => console.error(err))
worker.js
创建了两个 socket。PULL socket 负责连接到任务发布方(Ventilator),接收任务;PUSH socket 负责连接到结果收集方(sink),传递任务执行的结果。
实现 results collector
collector.js:
import zmq from 'zeromq'
async function main() {
const sink = new zmq.Pull()
await sink.bind('tcp://*:5017')
for await (const rawMessage of sink) {
console.log('Message from worker: ', rawMessage.toString())
}
}
main().catch(err => console.error(err))
运行以下命令测试结果:
node worker.js
node worker.js
node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
AMQP 实现 pipeline 和 competing consumers
Task distribution architecture using a message queue broker像前面那样在点对点的模式下,实现 pipeline 是非常直观的。假设我们需要借助 AMQP 这类系统实现任务分配模式,就必须确保每条消息都只会被一个消费者接收到。
可以直接将任务发布到目标 queue,不经过 exchange。避免了 exchange 有可能绑定了多个 queue 的情况。之后,多个消费者同时监听这一个 queue,消息即会以 fanout 的方式均匀地分发给所有的消费者。
hashsum 破解器的 AMQP 实现
producer-amqp.js:
import amqp from 'amqplib'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createConfirmChannel()
await channel.assertQueue('tasks_queue')
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
channel.sendToQueue('tasks_queue', Buffer.from(JSON.stringify(task)))
}
await channel.waitForConfirms()
channel.close()
connection.close()
}
main().catch(err => console.error(err))
- 此处创建的是一个
confirmChannel
,它提供了一个waitForConfirms()
函数,可以在 broker 确认收到消息前等待,确保应用不会过早地关闭到 broker 的连接 -
channel.sendToQueue()
负责将一条消息直接发送给某个 queue,跳过任何 exchange 或者路由
worker-amqp.js:
import amqp from 'amqplib'
import { processTask } from './processTask.js'
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('tasks_queue')
channel.consume(queue, async (rawMessage) => {
const found = processTask(
JSON.parse(rawMessage.content.toString()))
if (found) {
console.log(`Found! => ${found}`)
await channel.sendToQueue('results_queue',
Buffer.from(`Found: ${found}`))
}
await channel.ack(rawMessage)
})
}
main().catch(err => console.error(err))
collector-amqp.js:
import amqp from 'amqplib'
async function main() {
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const { queue } = await channel.assertQueue('results_queue')
channel.consume(queue, msg => {
console.log(`Message from worker: ${msg.content.toString()}`)
})
}
main().catch(err => console.error(err))
运行如下命令测试效果:
node worker-amqp.js
node worker-amqp.js
node collector-amqp.js
node producer-amqp.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
通过 Redis Streams 实现任务分发
Redis Stream 可以借助一种叫做 consumer groups 的特性实现任务分发模式。Consumer group 是一个有状态的实体,由一组名称标识的消费者组成,组中的消费者会以 round-robin 的方式接收记录。
每条记录都必须被显式地确认,否则该记录会一直处于 pending 状态。每个消费者都只能访问它自己的 pending 记录,假如消费者突然崩溃,在其回到线上后会先尝试获取其 pending 的记录。
Consumer group 也会记录其读取的上一条消息的 ID,因而在连续的读取操作中,consumer group 知道下一条要读取的记录时是哪个。
producer-redis.js:
import Redis from 'ioredis'
import { generateTasks } from './generateTasks.js'
const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()
const [, , maxLength, searchHash] = process.argv
async function main() {
const generatorObj = generateTasks(searchHash, ALPHABET,
maxLength, BATCH_SIZE)
for (const task of generatorObj) {
await redisClient.xadd('tasks_stream', '*',
'task', JSON.stringify(task))
}
redisClient.disconnect()
}
main().catch(err => console.error(err))
worker-redis.js:
import Redis from 'ioredis'
import { processTask } from './processTask.js'
const redisClient = new Redis()
const [, , consumerName] = process.argv
async function main() {
await redisClient.xgroup('CREATE', 'tasks_stream',
'workers_group', '$', 'MKSTREAM')
.catch(() => console.log('Consumer group already exists'))
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'STREAMS',
'tasks_stream', '0')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
while (true) {
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
for (const [recordId, [, rawTask]] of records) {
await processAndAck(recordId, rawTask)
}
}
}
async function processAndAck(recordId, rawTask) {
const found = processTask(JSON.parse(rawTask))
if (found) {
console.log(`Found! => ${found}`)
await redisClient.xadd('results_stream', '*', 'result',
`Found: ${found}`)
}
await redisClient.xack('tasks_stream', 'workers_group', recordId)
}
main().catch(err => console.error(err))
-
xgroup
命令用来确保 consumer group 存在。-
CREATE
表示我们希望创建一个 consumer group -
tasks_stream
表示我们想要读取的 stream 的名字 -
workers_group
是 consumer group 的名字 - 第四个参数表示 consumer group 开始读取的记录的位置。
$
表示当前 stream 中最后一条记录的 ID -
MKSTREAM
表示如果 stream 不存在则创建它
-
- 通过
xreadgroup
命令读取属于当前 consumer 的所有 pending 的记录。-
'GROUP'
、'workers_group'
、consumerName
用来指代 consumer group 和 consumer 的名字 -
STREAMS
和tasks_stream
用来指代我们想要读取的 stream 的名字 -
0
用来表示我们想要开始读取的记录的位置。这里表示从属于当前 consumer 的第一条记录开始,读取所有 pending 的消息
-
- 通过另外一条
xreadgroup
命令读取 stream 里新增加的记录。-
'BLOCK'
和'0'
两个参数表示如果没有新的消息,就一直阻塞等待。'0'
具体表示一直等待永不超时 -
'COUNT'
和'1'
表示一次请求只获取一条记录 - 特殊 ID
>
表示只获取还没有被当前的 consumer group 处理过的消息
-
-
processAndAck()
函数负责当xreadgroup()
返回的记录被处理完成时,调用xack
命令进行确认,将该记录从当前 consumer 的 pending 列表里移除
collector-redis.js:
import Redis from 'ioredis'
const redisClient = new Redis()
async function main() {
let lastRecordId = '$'
while (true) {
const data = await redisClient.xread(
'BLOCK', '0', 'STREAMS', 'results_stream', lastRecordId)
for (const [, logs] of data) {
for (const [recordId, [, message]] of logs) {
console.log(`Message from worker: ${message}`)
lastRecordId = recordId
}
}
}
}
main().catch(err => console.error(err))
运行程序测试效果:
node worker-redis.js workerA
node worker-redis.js workerB
node collector-redis.js
node producer-redis.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b
网友评论