page contents

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

基于RabbitMq,怎么弄一个RPC框架,来看看代码,应该能看懂哈!

attachments-2021-02-Pwh92zM6601e368c993de.png


RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。


1. RPC 框架

我们首先通过一张图理解 RPC 的工作流程:

attachments-2021-02-o0G9ZrBM601e36a0dfecc.jpg

因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:

attachments-2021-02-rvci9qL6601e36a7da0e5.jpg

即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。


2. RPCServer 实现

2.1 Server 初始化

1.  `/**`
2.  `* 队列名、交换机名、路由键`
3.  `*/`
4.  `private static final String EXCHANGE_NAME = "rpc_exchange";`
5.  `private static final String QUEUE_NAME = "request_rpc_queue";`
6.  `private static final String ROUTING_KEY = "rpc_routing_key";`
7.  `private Connection connection = null;`
8.  `private Channel channel = null;`
9.  `private QueueingConsumer consumer = null;`
10.  `/**`
11.  `* Server的构造函数`
12.  `*/`
13.  `private RPCServer() {`
14.  `try {`
15.  `//创建链接`
16.  `ConnectionFactory factory = new ConnectionFactory();`
17.  `factory.setHost(Config.HOST);`
18.  `factory.setPort(Config.PORT);`
19.  `factory.setUsername(Config.USER);`
20.  `factory.setPassword(Config.PASSWORD);`
21.  `connection = factory.newConnection();`
22.  `//创建信道`
23.  `channel = connection.createChannel();`
24.  `//设置AMQP的通信结构`
25.  `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
26.  `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
27.  `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
28.  `//设置消费者`
29.  `consumer = new QueueingConsumer(channel);`
30.  `channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);`
31.  `} catch (Exception e) {`
32.  `LOG.error("build connection failed!", e);`
33.  `}`
34.  `}`

初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。

2.2 监听队列并反馈

1.  `/**`
2.  `* 开启server`
3.  `*/`
4.  `private void startServer() {`
5.  `try {`
6.  `LOG.info("Waiting for RPC calls.....");`
7.  `while (true) {`
8.  `//获得文本消息`
9.  `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
10.  `BasicProperties props = delivery.getProperties();`
11.  `//返回消息的属性`
12.  `BasicProperties replyProps = new BasicProperties.Builder()`
13.  `.correlationId(props.getCorrelationId())`
14.  `.build();`
15.  `long receiveTime = System.currentTimeMillis();`
16.  `JSONObject json = new JSONObject();`
17.  `try {`
18.  `String message = new String(delivery.getBody(), "UTF-8");`
19.  `int n = Integer.parseInt(message);`
20.  `LOG.info("Got a request: fib(" + message + ")");`
21.  `json.put("status", "success");`
22.  `json.put("result", fib(n));`
23.  `} catch (Exception e) {`
24.  `json.put("status", "fail");`
25.  `json.put("reason", "Not a Number!");`
26.  `LOG.error("receive message failed!", e);`
27.  `} finally {`
28.  `long responseTime = System.currentTimeMillis();`
29.  `json.put("calculateTime", (responseTime - receiveTime));`
30.  `channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));`
31.  `channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);`
32.  `}`
33.  `}`
34.  `} catch (Exception e) {`
35.  `LOG.error("server failed!", e);`
36.  `} finally {`
37.  `if (connection != null) {`
38.  `try {`
39.  `connection.close();`
40.  `} catch (Exception e) {`
41.  `LOG.error("close failed!", e);`
42.  `}`
43.  `}`
44.  `}`
45.  `}`

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。
最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。

到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。


3. RPCClient 实现

3.1 初始化 CLient

1.  `/**`
2.  `* 消息请求的队列名、交换机名、路由键`
3.  `*/`
4.  `private static final String EXCHANGE_NAME = "rpc_exchange";`
5.  `private static final String QUEUE_NAME = "request_rpc_queue";`
6.  `private static final String ROUTING_KEY = "rpc_routing_key";`
7.  `/**`
8.  `* 消息返回的队列名、交换机名、路由键`
9.  `*/`
10.  `private static final String RESPONSE_QUEUE = "response_rpc_queue";`
11.  `private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";`
12.  `/**`
13.  `* RabbitMQ的实体`
14.  `*/`
15.  `private Connection connection = null;`
16.  `private Channel channel = null;`
17.  `private QueueingConsumer consumer = null;`
18.  `/**`
19.  `* 构造客户端`
20.  `* @throws Exception`
21.  `*/`
22.  `private RPCClient() throws Exception {`
23.  `ConnectionFactory factory = new ConnectionFactory();`
24.  `factory.setHost(Config.HOST);`
25.  `factory.setPort(Config.PORT);`
26.  `factory.setUsername(Config.USER);`
27.  `factory.setPassword(Config.PASSWORD);`
28.  `connection = factory.newConnection();`
29.  `channel = connection.createChannel();`
30.  `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
31.  `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
32.  `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
33.  `channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);`
34.  `channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);`
35.  `consumer = new QueueingConsumer(channel);`
36.  `channel.basicConsume(RESPONSE_QUEUE, true, consumer);`
37.  `}`

这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。

3.2 发送 / 接收消息

1.  `/**`
2.  `* 请求server`
3.  `* @param message`
4.  `* @return`
5.  `* @throws Exception`
6.  `*/`
7.  `private String requestMessage(String message) throws Exception {`
8.  `String response = null;`
9.  `String corrId = UUID.randomUUID().toString();`
10.  `BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();`
11.  `channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));`
12.  `while (true) {`
13.  `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
14.  `if (delivery.getProperties().getCorrelationId().equals(corrId)) {`
15.  `response = new String(delivery.getBody(),"UTF-8");`
16.  `break;`
17.  `}`
18.  `}`
19.  `return response;`
20.  `}`

BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。


4. 运行测试

Client 端发送:

attachments-2021-02-Hjh5WTcy601e36cd4ae69.png

Server 端接收并处理:

attachments-2021-02-NS5lLHJe601e36d3852d1.png

Client 收到计算结果:

attachments-2021-02-sqf2gJMi601e36d9b0916.png

由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。


5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

1. `JDK1.6以上 + Maven+ RabbitMQ`

5.2 源代码

完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo

其中 Server 的代码在:

1. `rpc.RPCServer`

Client 端的代码位置:

1. `rpc.RPCClient`

以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!


attachments-2021-02-l6hWeJa4601e36ef191f7.jpg来源:https://zhuanlan.zhihu.com/p/136532347

  • 发表于 2021-02-06 14:26
  • 阅读 ( 655 )
  • 分类:中间件

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
轩辕小不懂
轩辕小不懂

2403 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1320 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章