page contents

Java 异步处理 RabbitMQ

下面的例子是用Java编写的,但我相信它更多的是基本原理,同一个应用程序可以用任何语言来重新写。

attachments-2020-03-VvHYQs7u5e7afa36afaea.jpg

很多开发人员说,将应用程序切换到异步处理很复杂。因为他们有一个天然需要同步通信的Web应用程序。在这篇文章中,我想介绍一种方法来达到异步通信的目的:使用一些众所周知的库和工具来设计他们的系统。 下面的例子是用Java编写的,但我相信它更多的是基本原理,同一个应用程序可以用任何语言来重新写。

所需的工具和库:

  1. Spring Boot
  2. RabbitMQ


1.Web应用程序


一个用Spring MVC编写的Web应用程序并运行在Tomcat上。 它所做的只是将一个字符串发送到一个队列中 (异步通信的开始) 并等待另一个队列中的消息作为HTTP响应发送回来。

首先,我们需要定义几个依赖项,然后等待Spring Boot执行所有必要的自动配置。

  1. <dependencies>

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-web</artifactId>

  5. </dependency>

  6. <dependency>

  7. <groupId>org.springframework.boot</groupId>

  8. <artifactId>spring-boot-starter-amqp</artifactId>

  9. </dependency>

  10. <dependency>

  11. <groupId>com.thedeanda</groupId>

  12. <artifactId>lorem</artifactId>

  13. </dependency>

  14. </dependencies>


  1. @SpringBootApplication

  2. publicclassBlockingApplication{

  3. publicstaticvoid main(String[] args){

  4. SpringApplication.run(BlockingApplication.class, args);

  5. }

  6. @RestController

  7. publicstaticclassMessageController{

  8. privatefinalRabbitTemplate rabbitTemplate;

  9. publicMessageController(CachingConnectionFactory connectionFactory){

  10. this.rabbitTemplate =newRabbitTemplate(connectionFactory);

  11. }

  12. @GetMapping("invoke")

  13. publicString sendMessage(){

  14. Message response = rabbitTemplate.sendAndReceive("uppercase",null, request());

  15. returnnewString(response.getBody());

  16. }

  17. privatestaticMessage request(){

  18. Lorem LOREM =LoremIpsum.getInstance();

  19. String name = LOREM.getFirstName()+" "+ LOREM.getLastName();

  20. returnnewMessage(name.getBytes(),newMessageProperties());

  21. }

  22. }

  23. @Bean

  24. publicCachingConnectionFactory connectionFactory(){

  25. CachingConnectionFactory factory =newCachingConnectionFactory();

  26. factory.setAddresses("localhost:5672");

  27. factory.setUsername("admin");

  28. factory.setPassword("admin");

  29. return factory;

  30. }

  31. }


2.消费端应用程序


第二个应用程序仅仅是一个等待消息的RabbitMQ的消费端,将拿到的字符串转换为大写,然后将此结果发送到输出队列中。

  1. <dependencies>

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-amqp</artifactId>

  5. </dependency>

  6. </dependencies>


  1. @SpringBootApplication

  2. publicclassServiceApplication{

  3. publicstaticvoid main(String[] args){

  4. SpringApplication.run(ServiceApplication.class, args);

  5. }

  6. publicstaticclassMessageListener{

  7. publicString handleMessage(byte[] message){

  8. Random rand =newRandom();

  9. // Obtain a number between [0 - 49] + 50 = [50 - 99]

  10. int n = rand.nextInt(50)+50;

  11. String content =newString(message);

  12. try{

  13. Thread.sleep(n);

  14. }catch(InterruptedException e){

  15. e.printStackTrace();

  16. }

  17. return content.toUpperCase();

  18. }

  19. }

  20. @Bean

  21. publicCachingConnectionFactory connectionFactory(){

  22. CachingConnectionFactory factory =newCachingConnectionFactory();

  23. factory.setAddresses("localhost:5672");

  24. factory.setUsername("admin");

  25. factory.setPassword("admin");

  26. return factory;

  27. }

  28. @Bean

  29. publicSimpleMessageListenerContainer serviceListenerContainer(){

  30. SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();

  31. container.setConnectionFactory(connectionFactory());

  32. container.setConcurrentConsumers(20);

  33. container.setMaxConcurrentConsumers(40);

  34. container.setQueueNames("uppercase_messages");

  35. container.setMessageListener(newMessageListenerAdapter(newMessageListener()));

  36. return container;

  37. }

  38. }



3.底层如何执行的?


程序启动并首次调用sendMessage()方法后,我们可以看到Spring AMQP支持自动创建了一个新的回复队列并等待来自我们的服务应用程序的响应。

  1. 2019-05-1217:23:21.451 INFO 4574---[nio-8080-exec-1].l.DirectReplyToMessageListenerContainer:Container initialized for queues:[amq.rabbitmq.reply-to]

  2. 2019-05-1217:23:21.457 INFO 4574---[nio-8080-exec-1].l.DirectReplyToMessageListenerContainer:SimpleConsumer[queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started


如果我们在消费端应用程序中查看消息,我们可以看到Spring自动传播有关回复队列的信息以及相关ID,用于将其传递回Web应用程序以便能够将请求和响应配对在一起。

这就是发生魔术的地方。 当然,如果您想使其更复杂,您可以在协作中包含更多服务,然后将Web应用程序的最终响应放入与自动生成的队列不同的队列中, 该队列只具有正确的关联ID。 另外,不要忘记设置合理的超时。

这个解决方案还有一个很大的缺点 - 应用程序吞吐量。 我故意这样做,以便我可以跟进这篇文章,进一步深入调查 AsyncProfiler! 但是目前,我们使用Tomcat作为主HTTP服务器,默认为200个线程,这意味着我们的应用程序无法同时处理200多条消息,因为我们的服务器线程正在等待RabbitMQ 回复队列的响应,直到有消息进入或发生超时。


attachments-2020-03-618oUL7h5e7afa4f5f672.jpg

  • 发表于 2020-03-25 14:30
  • 阅读 ( 1189 )
  • 分类:中间件

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章