page contents

Java教程——一文搞懂Rabbitmq常用模式

本文讲述了Java教程——一文搞懂Rabbitmq常用模式!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-08-Vdk5f5U664d9858f248d1.png本文讲述了Java教程——一文搞懂Rabbitmq常用模式!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

Rabbitmq 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 实现,是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。消息传递指的是应用程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此通信,直接调用通常是指远程过程调用的技术。

Part2核心组成

Server:又称 Broker,接收客户端的连接,实现 AMQP 实体服务,安装 rabbitmq-server

Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手

Channel:网络信道,几乎所有操作都在 Channel 中进行,Channel 是进行消息读写的通道,客户端可以建立多个 Channel,每个 Channel 代表一个会话任务。

Message:消息,服务与应用程序之间传送的数据,由 Properties 和 Body 组成,Properties 可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body 则是消息体的内容。

Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个 exchange 和 queue,同一个虚拟主机里面不能有相同名称的 exchange

Exchange:交换机,接收消息,根据路由键发送消息到绑定的队列(不具备消息存储能力)

Bindings:exchange 和 queue 之间的虚拟连接,binding 中可以保存多个 routing key

Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息

Queue:队列,也称为 Message Queue,消息队列,保存消息并将它们转发给消费者

Part3Rabbitmq 消息模式

3.1 Simple 模式

Simple 模式是最简单的一个模式,由一个生产者,一个队列,一个消费者组成,生产者将消息通过交换机(此时,图中并没有交换机的概念,如不定义交换机,会使用默认的交换机)把消息存储到队列,消费者从队列中取出消息进行处理。

用 Java demo 实现此模式

Productor

public class Send {

    private final static String QUEUE_NAME = "queue1";


    public static void main(String[] args) {

        // 1、创建连接工程

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.96.109");

        factory.setVirtualHost("/");


        Connection connection = null;

        Channel channel = null;


        try {

            // 2、创建连接、通道

            connection = factory.newConnection();

            channel = connection.createChannel();

            // 3、声明队列

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 消息内容

            String message = "Hello world";

            // 4、发送消息到指定队列

            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

            System.out.println(" [x] Sent '" + message + "'");

        } catch (TimeoutException | IOException e) {

            e.printStackTrace();

        } finally {

            // 关闭通道

            if (channel != null && channel.isOpen()) {

                try {

                    channel.close();

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

            // 关闭连接

            if (connection != null && connection.isOpen()) {

                try {

                    connection.close();

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }

    }

}


Customer

public class Recv {

    private final static String QUEUE_NAME = "queue1";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 1、创建连接工程

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.96.109");

        factory.setVirtualHost("/");


        // 2、获取 Connection和 Channel

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();


        // 3、声明队列

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {

            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");

        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {

        });

    }

}


观察可视化界面,会看到消息先会被写入到队列中,随后又被消费者消费了。

3.2 Fanout 模式

Fanout——发布订阅模式,是一种广播机制。

此模式包括:一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将消息发送到交换机,交换机不存储消息,将消息存储到队列,消费者从队列中取消息。如果生产者将消息发送到没有绑定队列的交换机上,消息将丢失。

用 Java demo 实现此模式

Productor

public class Productor {

   private static final String EXCHANGE_NAME = "fanout_exchange";

   public static void main(String[] args) {

       // 1、创建连接工程

       ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("192.168.96.109");

       factory.setUsername("admin");

       factory.setPassword("admin");

       factory.setVirtualHost("/");

       Connection connection = null;

       Channel channel = null;

       try {

           // 2、获取连接、通道

           connection = factory.newConnection();

           channel = connection.createChannel();

           // 消息内容

           String message = "hello fanout mode";

           // 指定路由key

           String routeKey = "";

           String type = "fanout";

           // 3、声明交换机

           channel.exchangeDeclare(EXCHANGE_NAME, type);

           // 4、声明队列

           channel.queueDeclare("queue1", true, false, false, null);

           channel.queueDeclare("queue2", true, false, false, null);

           channel.queueDeclare("queue3", true, false, false, null);

           channel.queueDeclare("queue4", true, false, false, null);

           // 5、绑定 channel 与 queue

           channel.queueBind("queue1", EXCHANGE_NAME, routeKey);

           channel.queueBind("queue2", EXCHANGE_NAME, routeKey);

           channel.queueBind("queue3", EXCHANGE_NAME, routeKey);

           channel.queueBind("queue4", EXCHANGE_NAME, routeKey);

           // 6、发布消息

           channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));

           System.out.println("消息发送成功!");

       } catch (IOException | TimeoutException e) {

           e.printStackTrace();

           System.out.println("消息发送异常");

       }finally {

           // 关闭通道和连接......

       }

   }

}


Customer

public class Customer {

    private static Runnable runnable = new Runnable() {

        @Override

        public void run() {

            // 创建连接工厂

            ConnectionFactory factory = new ConnectionFactory();

            factory.setHost("192.168.96.109");

            factory.setUsername("admin");

         factory.setPassword("admin");

         factory.setVirtualHost("/");


            final String queueName = Thread.currentThread().getName();

            Connection connection = null;

            Channel channel = null;

            try {

                // 获取连接、通道

                connection = factory.newConnection();

                channel = connection.createChannel();


                Channel finalChannel = channel;

                finalChannel.basicConsume(queueName, true, new DeliverCallback() {

                    @Override

                    public void handle(String consumerTag, Delivery delivery) throws IOException {

                        System.out.println(delivery.getEnvelope().getDeliveryTag());

                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    }

                }, new CancelCallback() {

                    @Override

                    public void handle(String consumerTag) throws IOException {

                    }

                });

                System.out.println(queueName + ":开始接收消息");

            } catch (IOException |

                    TimeoutException e) {

                e.printStackTrace();

            } finally {

                // 关闭通道和连接......

            }

        }

    };

    public static void main(String[] args) throws IOException, TimeoutException {

     // 创建线程分别从四个队列中获取消息

        new Thread(runnable, "queue1").start();

        new Thread(runnable, "queue2").start();

        new Thread(runnable, "queue3").start();

        new Thread(runnable, "queue4").start();

    }

}

执行完 Productor 发现四个队列中分别增加了一条消息,而执行完 Customer 后四个队列中的消息都被消费者消费了。

3.3 Direct 模式

Direct 模式是在 Fanout 模式基础上添加了 routing key,Fanout(发布/订阅)模式是交换机将消息存储到所有绑定的队列中,而 Direct 模式是在此基础上,添加了过滤条件,交换机只会将消息存储到满足 routing key 的队列中。

在上图中,我们可以看到交换机绑定了两个队列,其中队列 Q1绑定的 routing key 为 “orange” ,队列Q2绑定的routing key 为 “black” 和 “green”。在这样的设置中,发布 routing key 为 “orange” 的消息将被路由到 Q1,routing key 为 “black” 或 “green” 的消息将被路由到 Q2

在 rabbitmq 中给队列绑定 routing_key,routing_key 必须是单词列表

用 Java demo 实现此模式

Productor

public class Productor {

    private static final String EXCHANGE_NAME = "direct_exchange";


    public static void main(String[] args) {

        // 1、创建连接工程

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.96.109");

        factory.setUsername("admin");

        factory.setPassword("admin");

        factory.setVirtualHost("/");


        Connection connection = null;

        Channel channel = null;

        try {

            // 2、获取连接、通道

            connection = factory.newConnection();

            channel = connection.createChannel();

            // 消息内容

            String message = "hello direct mode";

            // 指定路由key

            String routeKey = "email";

            String type = "direct";

            // 3、声明交换机

            channel.exchangeDeclare(EXCHANGE_NAME, type);

            // 4、声明队列

            channel.queueDeclare("queue1", true, false, false, null);

            channel.queueDeclare("queue2", true, false, false, null);

            channel.queueDeclare("queue3", true, false, false, null);

            // 5、绑定 channel 与 queue

            channel.queueBind("queue1", EXCHANGE_NAME, "email");

            channel.queueBind("queue2", EXCHANGE_NAME, "sms");

            channel.queueBind("queue3", EXCHANGE_NAME, "vx");

   // 6、发布消息

            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));

            System.out.println("消息发送成功!");

        } catch (IOException | TimeoutException e) {

            e.printStackTrace();

            System.out.println("消息发送异常");

        } finally {

            // 关闭通道和连接......

        }

    }

}

可以通过可视化页面查看,各队列绑定的 routing_key

由于设置的 routing_key为 “email”,所以,应该只有 queue1 存储了一条消息。

Customer 与上述 fanout 示例一致。

3.4 Topic 模式

Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;如果一个队列也没匹配上,该消息将丢失。

routing_key 必须是单词列表,用点分隔,其中 * 和 # 的含义为:

*:1个单词

#:0个或多个单词

用Java demo 实现此模式

Productor

public class Productor {

    private static final String EXCHANGE_NAME = "topic_exchange";


    public static void main(String[] args) {

        // 1、创建连接工程

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.96.109");

        factory.setUsername("admin");

        factory.setPassword("admin");

        factory.setVirtualHost("/");


        Connection connection = null;

        Channel channel = null;

        try {

           // 2、获取连接、通道

            connection = factory.newConnection();

            channel = connection.createChannel();

            // 消息内容

            String message = "hello topic mode";

            // 指定路由key

            String routeKey = "com.order.test.xxx";

            String type = "topic";

            // 3、声明交换机

            channel.exchangeDeclare(EXCHANGE_NAME, type);

            // 4、声明队列

            channel.queueDeclare("queue5",true,false,false,null);

            channel.queueDeclare("queue6",true,false,false,null);

            // 5、绑定 channel 与 queue

            channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");

            channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*");

            // 6、发布消息

            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));

            System.out.println("消息发送成功!");

        } catch (IOException | TimeoutException e) {

            e.printStackTrace();

            System.out.println("消息发送异常");

        } finally {

            // 关闭通道和连接......

        }

    }

}

执行完 Productor 后,通过可视化页面查看到,queue 绑定的 routing_key

由于上述例子中,routing_key为:“com.order.test.xxx”,那么 queue5 和 queue6 都将接收到消息。

Customer 与上述实例一样,执行完 Customer 后,再次查看队列信息,queue5 和 queue6 的消息都被消费了。

3.5 Work 模式

当有多个消费者时,如何均衡消息者消费消息的多少,主要有两种模式:

轮询模式分发:按顺序轮询分发,每个消费者获得相同数量的消息

公平分发:根据消费者消费能力公平分发,处理快的处理的多,处理慢的处理的少,按劳分配

3.5.1 轮询分发

在这种模式下,rabbitmq 采用轮询的方式将任务分配给多个消费者,但可能出现一种情况,当分配给某一个消费者的任务很复杂时,而有些消费者接收的任务较轻量,会出现有的消费者很忙,而有的消费者处于空闲的状态,而 rabbitmq 不会感知到这种情况的发生,rabbitmq 不考虑消费者未确认消息的数量,只是盲目的分配任务。

用 Java demo 实现此模式

Productor

public class Productor {

    public static void main(String[] args) {

        // 1、创建连接工程

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.96.109");

        factory.setUsername("admin");

        factory.setPassword("admin");

        factory.setVirtualHost("/");


        Connection connection = null;

        Channel channel = null;

        try {

            // 2、获取连接、通道

            connection = factory.newConnection();

            channel = connection.createChannel();

            // 3、向 Queue1 发布20个消息

            for (int i = 0; i < 20; i++) {

                String msg = "feiyangyang: " + i;

                channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));

            }

            System.out.println("消息发送成功!");

        } catch (IOException | TimeoutException e) {

            e.printStackTrace();

            System.out.println("消息发送异常");

        } finally {

            // 关闭通道和连接......

        }

    }

}


Worker1


public class Worker1 {

    public static void main(String[] args) {

        // 1、创建连接工厂

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.96.109");

        factory.setUsername("admin");

        factory.setPassword("admin");

        factory.setVirtualHost("/");


        Connection connection = null;

        Channel channel = null;

        try {

            // 获取连接、通道

            connection = factory.newConnection();

            channel = connection.createChannel();

            Channel finalChannel = channel;

            finalChannel.basicConsume("queue1", true, new DeliverCallback() {

                @Override

                public void handle(String consumerTag, Delivery delivery) throws IOException {

                    System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

                    try {

                        Thread.sleep(2000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }, new CancelCallback() {

                @Override

                public void handle(String consumerTag) throws IOException {

                }

            });

            System.out.println("Worker1 开始接收消息");

            System.in.read();

        } catch (IOException |

                TimeoutException e) {

            e.printStackTrace();

        } finally {

            // 关闭通道和连接......

        }

    }

}


Worker2 与 Worker1 相同


我们看下消息分发结果:


Worker1 开始接收消息

Worker1:收到消息是:feiyangyang: 0

Worker1:收到消息是:feiyangyang: 2

Worker1:收到消息是:feiyangyang: 4

Worker1:收到消息是:feiyangyang: 6

Worker1:收到消息是:feiyangyang: 8

Worker1:收到消息是:feiyangyang: 10

Worker1:收到消息是:feiyangyang: 12

Worker1:收到消息是:feiyangyang: 14

Worker1:收到消息是:feiyangyang: 16

Worker1:收到消息是:feiyangyang: 18


Worker2 开始接收消息

Worker2:收到消息是:feiyangyang: 1

Worker2:收到消息是:feiyangyang: 3

Worker2:收到消息是:feiyangyang: 5

Worker2:收到消息是:feiyangyang: 7

Worker2:收到消息是:feiyangyang: 9

Worker2:收到消息是:feiyangyang: 11

Worker2:收到消息是:feiyangyang: 13

Worker2:收到消息是:feiyangyang: 15

Worker2:收到消息是:feiyangyang: 17

Worker2:收到消息是:feiyangyang: 19


可以看出,轮询分发模式就是将消息均衡的分配所有消费者。

3.5.2 公平分发

为了解决 Work 轮询分发模式 这个问题,rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。

Productor 代码与上述轮询模式相同,而 Customer 中稍作修改

Worker1

// Channel 使用 Qos 机制

finalChannel.basicQos(1);

finalChannel.basicConsume("queue1", false, new DeliverCallback() {

    @Override

    public void handle(String consumerTag, Delivery delivery) throws IOException {

        System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));

        try {

            Thread.sleep(1000);

            // 改成手动应答

            finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}, new CancelCallback() {

    @Override

    public void handle(String consumerTag) throws IOException {

    }

});


上述实例相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息,将 Worker1 的 sleep 时间设置为 1s,将 Worker2 的 sleep 时间设置为 2s,查看消息分发结果


Worker1 开始接收消息

Worker1:收到消息是:feiyangyang: 0

Worker1:收到消息是:feiyangyang: 2

Worker1:收到消息是:feiyangyang: 4

Worker1:收到消息是:feiyangyang: 5

Worker1:收到消息是:feiyangyang: 7

Worker1:收到消息是:feiyangyang: 8

Worker1:收到消息是:feiyangyang: 10

Worker1:收到消息是:feiyangyang: 11

Worker1:收到消息是:feiyangyang: 13

Worker1:收到消息是:feiyangyang: 14

Worker1:收到消息是:feiyangyang: 16

Worker1:收到消息是:feiyangyang: 17

Worker1:收到消息是:feiyangyang: 19

Worker2 开始接收消息

Worker2:收到消息是:feiyangyang: 1

Worker2:收到消息是:feiyangyang: 3

Worker2:收到消息是:feiyangyang: 6

Worker2:收到消息是:feiyangyang: 9

Worker2:收到消息是:feiyangyang: 12

Worker2:收到消息是:feiyangyang: 15

Worker2:收到消息是:feiyangyang: 18


当使用 Work 公平分发模式时,要设置消费者为手动应答,并且开启 Qos 机制。

Part4防止消息丢失机制

4.1 消息确认

消费者完成一项任务可能需要几秒钟,如果其中一个消费者开始了一项长期任务并且只完成了部分任务而死亡,如果将 autoAck 设置为 true ,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除,在这种情况下,我们将丢失所有已分派给该特定消费者但尚未处理的消息。

如果其中一个消费者宕了,rabbitmq 可以将其消息分配给其他消费者。为了确保消息不会丢失,rabbitmq 采用消息确认,消费者发回确认消息,告诉 rabbitmq 消息已经被接收并处理,此时,rabbitmq 可以放心的删除这条消息。

如果消费者在没有发送 ack 的情况下宕了,rabbitmq 将理解为该条消息未被消费者处理完,如果有其他消费者在线,将迅速重新交付给其他消费者,这样就可以确保不会丢失消息了。

默认情况下rabbitmq 会启用手动消息确认,也就是 autoAck 默认为 false,一旦我们完成了一项任务,需要手动的进行消息确认,所以 autoAck 需要保持为默认值 false,并使用如下方法进行手动应答。

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

4.2 持久化

rabbitmq 的消息确认机制可以保证消息不会丢失,但是如果 rabbitmq 服务器停止,我们的任务仍然会丢失。

当 rabbitmq 退出或崩溃时,如果不进行持久化,队列和消息都会消失。需要做两件事来确保消息不会丢失,将队列和消息都标记为持久的。

设置队列持久

boolean durable = true;

channel.queueDeclare("hello", durable, false, false, null);

设置消息持久

channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

将消息标记为持久性并不能完全保证消息不会丢失,当 rabbitmq 接收到消息并且还没保存时,仍然有很短的时间窗口会使消息丢失,如果需要更强的保证,可以使用发布者确认机制。

Part5使用场景

解耦、削峰、异步

解耦

在微服务架构体系中,微服务A需要与微服务B进行通信,传统的做法是A调用B的接口。但这样做如果系统B无法访问或连接超时,系统A需要等待,直到系统B做出响应,并且A与B存在严重的耦合现象。如果引入消息队列进行系统AB的通信,流程是这样的:

系统A将消息存储到消息队列中,返回成功信息

系统B从队列中获取消息,进行处理操作

系统A将消息放到队列中,就不用关心系统B是否可以获取等其他事情了,实现了两个系统间的解耦。

使用场景:

短信、邮件通知

削峰

系统A每秒请求100个,系统可以稳定运行,但如果在秒杀活动中,每秒并发达到1w个,但系统最大处理能力只能每秒处理 1000 个,所以,在秒杀活动中,系统服务器会出现宕机的现象。如果引入 MQ ,可以解决这个问题。每秒 1w个请求会导致系统崩溃,那我们让用户发送的请求都存储到队列中,由于系统最大处理能力是每秒1000个请求,让系统A每秒只从队列中拉取1000个请求,保证系统能稳定运行,在秒杀期间,请求大量进入到队列,积压到MQ中,而系统每秒只从队列中取1000个请求处理。这种短暂的高峰期积压是没问题的,因为高峰期一旦过去,每秒请求数迅速递减,而系统每秒还是从队列中取1000个请求进行处理,系统会快速将积压的消息消费掉。

使用场景:

秒杀活动

团抢活动

异步

用户注册,需要发送注册邮件和注册短信,传统的做法有两种:串行、并行。

串行方式:将注册信息写库后(50ms),发送邮件(50ms),再发送短信(50ms),任务完成后,返回客户端,共耗时(150ms)

并行方式:将注册信息写库后(50ms),开启子线程让发送邮件和发送短信同时进行(50ms),返回客户端,共耗时(100ms)

引入MQ,将注册信息写库(50ms),将发送邮件和短信的操作写入队列(5s),返回客户端,而消费者什么时候从队列中取消息进行处理,不用关心,共耗时(55ms)

使用场景:

将不是必须等待响应结果的业务逻辑进行异步处理

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-08-14 09:38
  • 阅读 ( 156 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
王昭君
王昭君

209 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1316 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章