流量削峰
消息队列的一个主要用途是做流量削峰。
比如一台单点服务器,平时的访问量很少,但如果做秒杀活动,流量的突然剧增会让db扛不住压力而崩溃。因此可以维护一个消息队列,出现流量高峰时排队执行即可。
db
数据库用postpresql来玩,在docker安装一下。
docker run --name my-postgis -e POSTGRES_PASSWORD=test_123456 -d postgis/postgis
然后安装一个图形化界面navicat,创建几个表。
//商品表
create table products(id SERIAL,prodname text,isn text,price real,CONSTRAINT products_pkey PRIMARY KEY(id));
//订单表
create table goodsorder(id SERIAL,prodid integer,uid integer,amount integer,CONSTRAINT goodsorder_pkey PRIMARY KEY(id));
//库存表
create table stocks(id SERIAL,prodid integer,amount integer,CONSTRAINT stocks_pkey PRIMARY KEY(id));
//增加测试数据,商品
insert into products(prodname,isn,price) values('高露洁牙膏','123456789',12.50);
insert into products(prodname,isn,price) values('百草味瓜子','876554439',5.00);
//增加测试数据,库存
insert into stocks(prodid,amount) values(1,5);
insert into stocks(prodid,amount) values(2,10);
node
安装pg和amqplib两个插件。
npm install pg -S
npm install amqplib -S
当用户下订单时,我们用消息队列作缓冲层,只需向mq写入一条数据后即可返回前端,把耗时的db操作排队执行。
代码:
import { Controller, Get, Post, Req, Res } from "@nestjs/common";
import { Pool } from 'pg';
const amqp = require('amqplib/callback_api');
@Controller("mq")
export class MqController {
pool: Pool;
channel;
customer;
constructor() {
this.pool = new Pool({
host: 'localhost',
port: 5432,
password: 'test_123456',
user: 'postgres',
database: 'shop'
});
}
@Get('/order')
async publishOrder(@Req() req, @Res() res) { //下订单
let orderObj = { uid: req.query.uid, prodid: req.query.prodid, amount: req.query.amount };
if (!this.channel) {
this.channel = await this.createChannel('amqp://guest:guest@localhost');
this.channel.assertExchange('shop', 'direct', { durable: false });
//只有一个消费者
this.customer = await this.channel.assertQueue('customer');
this.channel.bindQueue(this.customer.queue, 'shop', 'order');
}
//生产者
this.channel.publish('shop', 'order', Buffer.from(JSON.stringify(orderObj)));
//通知消费者
this.consumeOrder();
//返回前端
res.send({
code: 0,
status: 'success',
msg: '下单成功'
});
}
async consumeOrder() {
//消费者,从mq拉取数据
let orderObj: any = await this.consume();
console.log('从mq获取的订单信息>>>', orderObj);
if (orderObj) {
//查询库存
let stocksql = `select prodid,amount from stocks where prodid=${orderObj.prodid}`;
let stockData = await this.pool.query(stocksql);
console.log('库存数据>>>', stockData.rows[0].amount);
//如果库存够了
let stockCount = Number(stockData.rows[0].amount) - Number(orderObj.amount);
if (stockCount >= 0) {
let updatestocksql = `update stocks set amount=${stockCount} where prodid=${orderObj.prodid}`;
let minusRes = await this.pool.query(updatestocksql);
console.log('减掉库存>>>', minusRes);
if (minusRes.rowCount) {
//生成订单
let ordersql = `insert into goodsorder(prodid,uid,amount) values(${orderObj.prodid},${orderObj.uid},${orderObj.amount})`;
let result = await this.pool.query(ordersql);
console.log('生成订单>>>', result);
}
}
}
}
createChannel(url) {
return new Promise((resolve, reject) => {
amqp.connect(url, (error, conn) => {
if (error) {
reject(error);
}
conn.createChannel((error, channel) => {
if (error) {
reject(error);
}
resolve(channel);
})
})
});
}
consume() {
return new Promise((resolve, reject) => {
this.channel.consume(this.customer.queue, data => {
console.log('消费订单>>>', data.content.toString());
let orderObj: any = JSON.parse(data.content.toString());
resolve(orderObj);
}, { noAck: true });
});
}
}
代码看着有点乱,稍微优化一下。
import { Controller, Get, Post, Req, Res } from "@nestjs/common";
import { Pool } from 'pg';
const amqp = require('amqplib/callback_api');
@Controller("mq")
export class MqController {
pool: Pool;
channel;
customer;
constructor() {
//db初始化
this.pool = new Pool({
host: 'localhost',
port: 5432,
password: 'test_123456',
user: 'postgres',
database: 'shop'
});
}
async initMq(){
if (!this.channel) {
this.channel = await this.createChannel('amqp://guest:guest@localhost');
this.channel.assertExchange('shop', 'direct', { durable: false });
//只有一个消费者
this.customer = await this.channel.assertQueue('customer');
this.channel.bindQueue(this.customer.queue, 'shop', 'order');
this.consumeOrder();
}
}
//发布消息
publishMessage(orderObj){
this.channel.publish('shop', 'order', Buffer.from(JSON.stringify(orderObj)));
}
@Get('/order')
async publishOrder(@Req() req, @Res() res) { //下订单
let orderObj = { uid: req.query.uid, prodid: req.query.prodid, amount: req.query.amount };
await this.initMq();
//向mq发布一条消息
this.publishMessage(orderObj);
//返回前端
res.send({
code: 0,
status: 'success',
msg: '下单成功'
});
}
async consumeOrder() { //有关db的操作,没有返回前端信息
//消费者,从mq拉取数据
let orderObj: any = await this.consume();
console.log('从mq获取的订单信息>>>', orderObj);
if (orderObj) {
//查询库存
let stocksql = `select prodid,amount from stocks where prodid=${orderObj.prodid}`;
let stockData = await this.pool.query(stocksql);
console.log('库存数据>>>', stockData.rows[0].amount);
//如果库存够了
let stockCount = Number(stockData.rows[0].amount) - Number(orderObj.amount);
if (stockCount >= 0) {
let updatestocksql = `update stocks set amount=${stockCount} where prodid=${orderObj.prodid}`;
let minusRes = await this.pool.query(updatestocksql);
console.log('减掉库存>>>', minusRes);
if (minusRes.rowCount) {
//生成订单
let ordersql = `insert into goodsorder(prodid,uid,amount) values(${orderObj.prodid},${orderObj.uid},${orderObj.amount})`;
let result = await this.pool.query(ordersql);
console.log('生成订单>>>', result);
}
}
}
}
createChannel(url) {
return new Promise((resolve, reject) => {
amqp.connect(url, (error, conn) => {
if (error) {
reject(error);
}
conn.createChannel((error, channel) => {
if (error) {
reject(error);
}
resolve(channel);
})
})
});
}
consume() {
return new Promise((resolve, reject) => {
this.channel.consume(this.customer.queue, data => {
console.log('消费订单>>>', data.content.toString());
let orderObj: any = JSON.parse(data.content.toString());
resolve(orderObj);
}, { noAck: true });
});
}
}
网友评论