关键词:AWS API Gateway WebSockets Example
![](https://img.haomeiwen.com/i6151220/cdda1333d534f980.png)
需求背景
基于AWS APIGateway的微服务架构下,需要实现 “服务器端到客户端” 的通知推送实时交互功能。
AWS APIGateway RestfulAPI 是HTTP协议的,而 “服务器端到客户端” 采用的Websocket是TCP协议的。
由于协议问题,AWS APIGateway RestfulAPI 被PASS,后发现AWS APIGateway WebsocketAPI 已悄悄上线,故决定使用AWS APIGateway WebsocketAPI搭建一个实时聊天系统,从而确定该方案的可行性。
架构方案
采用 API Gateway Websocket API +Lambda + DynamoDB 搭建一个实时聊天程序,如下为基础架构图:
![](https://img.haomeiwen.com/i6151220/2c300202229ab412.png)
在我们的应用程序中,设备将连接到API网关。当设备连接时,lambda函数将在DynamoDB表中保存连接ID。当设备断开连接时,另一个lambda函数将在DynamoDB表中移除连接ID。在我们想要将消息发送回设备的实例中,第三个lambda函数将使用回调URL将连接ID和POST数据发送回设备。
实现步骤
Step1 创建Gateway WebSocket API
登陆AWS Console 转到Amazon API Gateway服务,单击WebSocket以创建WebSocket API,提供API名称和路径选择表达式。在示例中,添加 $request.body.action作为选择表达式并点击Create API。
![](https://img.haomeiwen.com/i6151220/db9d13997e94d390.png)
![](https://img.haomeiwen.com/i6151220/daffeea03220cfcc.png)
创建API后,我们将重定向到路由页面。在这里我们可以看到已经预定义的三条路线:$connect,$disconnect和 $default,我们还将创建一个自定义路由onMessage。
在我们的架构中,$connect和$disconnect routes实现以下任务:
$connect - 当调用此路由时,Lambda函数会将连接设备的连接ID添加到DynamoDB。
$disconnect - 调用此路由时,Lambda函数将从DynamoDB中删除已断开连接的设备的连接ID。
onMessage - 当调用此路由时,消息正文将被发送到当时连接的所有设备。
Step2 创建用于"存储连接设备ID"的DynamoDB
![](https://img.haomeiwen.com/i6151220/393a5b10b6e225eb.png)
Step3 创建connect Lambda函数 ChatRoomConnectFunction
![](https://img.haomeiwen.com/i6151220/a1f59c004449a3c0.png)
在lambda函数的代码中添加以下代码,此代码将连接设备的连接ID添加到我们创建的DynamoDB表中:
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context, callback) => {
const connectionId = event.requestContext.connectionId;
addConnectionId(connectionId).then(() => {
callback(null, {
statusCode: 200,
})
});
}
function addConnectionId(connectionId) {
return ddb.put({
TableName: 'Chat',
Item: {
connectionid : connectionId
},
}).promise();
}
然后发布Lambda新版本
![](https://img.haomeiwen.com/i6151220/625b060b3deb03d7.png)
Step4 创建disconnect Lambda函数 ChatRoomDonnectFunction
![](https://img.haomeiwen.com/i6151220/3c6a2814a927b4bf.png)
在lambda函数的代码中添加以下代码,当设备断开连接时,此代码将从DynamoDB表中删除连接ID:
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = (event, context, callback) => {
const connectionId = event.requestContext.connectionId;
deleteConnectionId(connectionId).then(() => {
callback(null, {
statusCode: 200,
})
});
}
function deleteConnectionId(connectionId) {
return ddb.delete({
TableName: 'Chat',
Key: {
connectionid : connectionId,
},
}).promise();
}
然后发布Lambda新版本
![](https://img.haomeiwen.com/i6151220/bef69139d1fdf1f2.png)
Step5 配置 $connect和 $disconnect路由,测试WebSocket API是否正常工作
现在我们已经创建了DynamoDB表和两个lambda函数。在创建第三个lambda函数之前,让我们再回到API Gateway并使用我们创建的lambda函数配置路由。首先,单击$ connect route。作为集成类型,选择Lambda函数并选择ChatRoomConnectFunction。
![](https://img.haomeiwen.com/i6151220/5530fc831c292320.png)
在$disconnect路由上执行相同的操作,其中lambda函数将是ChatRoomDonnectFunction:
![](https://img.haomeiwen.com/i6151220/60c385d5f53b6c02.png)
![](https://img.haomeiwen.com/i6151220/f84f7f8789b00750.png)
为了方便调错,建议开启CloudWatch日志记录
![](https://img.haomeiwen.com/i6151220/fb8aabb21603d0c7.png)
部署后,我们将看到两个URL。第一个URL称为WebSocket URL,第二个URL称为连接URL。
![](https://img.haomeiwen.com/i6151220/18384036af860267.png)
WebSocket URL是用于通过设备将WebSockets连接到API的URL。第二个URL,即Connection(连接)URL,向连接的客户端发送回调消息、获取连接信息或断开客户端连接。
使用wscat工具进行测试(使用 npm install -g wscat 安装)。
wscat -c wss://91ajt7fo78.execute-api.ap-northeast-2.amazonaws.com/dev
![](https://img.haomeiwen.com/i6151220/cfff66de692f1ea1.png)
查看DynamoDB可以看到connectionid
![](https://img.haomeiwen.com/i6151220/ea0c406d9632d534.png)
控制台 ctrl+z 中断连接之后再次查看DynamoDB会发现刚才的connectionid已被删除:
![](https://img.haomeiwen.com/i6151220/adfefb9d06bcb06b.png)
Step6 实现发送消息多客户端接收功能
新增lambda函数ChatRoomOnMessageFunction,查询Chat DynamoDB表,获取所有连接ID,并将消息发送给这些连接ID对应的终端:
index.js
扫描DynamoDB以获取表中的所有可用记录,lambda函数将解析“message”属性并将其发送给其他终端:
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
require('./patch.js');
let send = undefined;
function init(event) {
console.log(event)
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
});
send = async (connectionId, data) => {
await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: `Echo: ${data}` }).promise();
}
}
exports.handler = (event, context, callback) => {
init(event);
let message = JSON.parse(event.body).message
getConnections().then((data) => {
console.log(data.Items);
data.Items.forEach(function(connection) {
console.log("Connection " +connection.connectionid)
send(connection.connectionid, message);
});
});
return {}
};
function getConnections(){
return ddb.scan({
TableName: 'Chat',
}).promise();
}
patch.js
自动为我们的API创建回调URL并发送POST请求:
require('aws-sdk/lib/node_loader');
var AWS = require('aws-sdk/lib/core');
var Service = AWS.Service;
var apiLoader = AWS.apiLoader;
apiLoader.services['apigatewaymanagementapi'] = {};
AWS.ApiGatewayManagementApi = Service.defineService('apigatewaymanagementapi', ['2018-11-29']);
Object.defineProperty(apiLoader.services['apigatewaymanagementapi'], '2018-11-29', {
get: function get() {
var model = {
"metadata": {
"apiVersion": "2018-11-29",
"endpointPrefix": "execute-api",
"signingName": "execute-api",
"serviceFullName": "AmazonApiGatewayManagementApi",
"serviceId": "ApiGatewayManagementApi",
"protocol": "rest-json",
"jsonVersion": "1.1",
"uid": "apigatewaymanagementapi-2018-11-29",
"signatureVersion": "v4"
},
"operations": {
"PostToConnection": {
"http": {
"requestUri": "/@connections/{connectionId}",
"responseCode": 200
},
"input": {
"type": "structure",
"members": {
"Data": {
"type": "blob"
},
"ConnectionId": {
"location": "uri",
"locationName": "connectionId"
}
},
"required": [
"ConnectionId",
"Data"
],
"payload": "Data"
}
}
},
"shapes": {}
}
model.paginators = {
"pagination": {}
}
return model;
},
enumerable: true,
configurable: true
});
module.exports = AWS.ApiGatewayManagementApi;
在API Gateway中新建route onMessage,并指向ChatRoomOnMessageFunction Lambda函数:
![](https://img.haomeiwen.com/i6151220/295639eefd621fa0.png)
保存并点击右上角的按钮添加集成响应:
![](https://img.haomeiwen.com/i6151220/34a2aeb80482f1da.png)
部署API到相应阶段(如果部署的时候报错,请先给$default路由配置与onMessage一样的集成响应规则即可):
![](https://img.haomeiwen.com/i6151220/86def434324a531a.png)
Step7 测试功能可用
多个终端,并与API Gateway WebsocketAPI 建立连接:
![](https://img.haomeiwen.com/i6151220/d9dd6b60c123098c.png)
终端发送内容
{"action" : "onMessage" , "message" : "Hello everyone"}
![](https://img.haomeiwen.com/i6151220/755260138ffa2988.png)
END
出现以上截图则说明Demo成功。
说明:
发送内容的action字段value值对应了API Gateway Websocket的路由名称,如上截图我们的action值为onMessage,则在发送消息的时候API Gateway Websocket会找onMessage路由对应的集成请求规则进行相应。
思考:
如何在以上Demo的思路之上进行延伸,使用API Gateway Websocket+Lambda+DynamoDB搭建带权限校验的公司内部公共终端推送微服务?如何可视化PC终端推送策略?
网友评论