在RabbiMQ系列(三)work queue中我们讲了怎么用work queue去分发一个耗时任务。但是如果我们需要调用远程端的一个函数方法并等待它返回结果。那么就需要用到Remote Procedure Call(RPC),在本节中我们将用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务端,由于没有任何值得分发的耗时任务,所以我们将创建一个返回Fibonacci数的虚拟RPC服务。
Callback queue
一般来说RPC服务在RabbitMQ上实现是很容易的。一个客户端发送一个request message以及一个服务端回复一个response message。为了接收到response,我们需要发送一个callback queue地址在request中。RPC的调用结果是通过callback queue返回回来的。
Message properties
AMQP 0-9-1协议预定义了14个消息配置,但他们中的大多数都很少用到,除了下列4个外:
persistent:将消息标记为persistent(值为true)或transient(false)。
content_type:用来描述mime类型的编码,如经常使用到的json编码就是一个很好的练习,设置该属性为application/json。
reply_to:通常用来命名一个callback queue。
correlation_id:用于将RPC响应与请求关联。
Correlation Id
在上面提到的方法中,我们建议为每个RPC请求创建一个callback queue,但是这非常的低效,不过有个更好的方法:让我们为每个客户端创建一个callback queue。但是又引入了一个新问题:那就是在从queue中接收到一个response时,我们不知道这个response的结果是哪个request返回的。这是correlation_id就起到了作用。我们在每个request中都加入唯一的一个correlation_id,然后,我们从callback queue中接收到返回消息,再查看其中的correlation_id属性,这样就知道了哪个response的结果对应的是哪个request。如果收到一个未知的correlation_id值,那么我们可以安全的丢弃它,因为我们所有的request中都不包含这个值。
Summary
RPC工作图解
当Client启动时,它会创建一个匿名的exclusive callback queue。对于RPC request,Client发送的message会包含两个属性:replay_to:它是设置callback queue;correlation_id:为request设置唯一个序列值。这个request会被发送到一个叫rpc_queue的queue中。
RPC worker(又名:server)监听queue上的request请求。当有request时,它会完成请求的工作并返回一个包含结果的message给Client,用的queue就是reply_to配置的那个queue。
Client端在callback queue中等待返回数据。当有一个message出现时,它会查看这个message的correlation_id,如果它和request中的值相匹配,则返回对应的程序响应。
rpc_server.go
像往常一样,我们开始建立连接,通道,并声明队列。我们可能想要运行多个服务器进程,为了能在多个服务器上平均分配负载,我们需要设置channel的prefetch值。
我们用Channel.Consume从queue中得到一个go channel。然后另起一个goroutine进行计算工作以及返回结果。
rpc_client.go
发送requests并从msgs中获取response
其他辅助函数
运行结果:
client
rpc_server
此图之前不见了,这是晚点运行出来补上的,别在乎时间
网友评论