• 1 已解决
    0
    rabbitmq中有没有类似redis中set类型的队列?

    如果没有,该如何确保队列中数据是不重复的呢呢

  • 1 未解决
    0
    RabbitMQ异常:inequivalent arg 'x-message-ttl' for queue和Failed to check/redeclare auto-delete queue(s)

    SpringMVC项目集成RabbitMQ


    当新增了queue的x-message-ttl属性后,项目重启时报如下异常。我尝试着手动在RabbitMQ后台管理界面去重新创建这个queue,错误依旧如下。

    这个异常目前不影响运行,请问什么原因导致,怎么解决?


    | ERROR | 2020-02-04 21:25:08 | CachingConnectionFactory.java | shutdownCompleted | 292 | Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg ‘x-message-ttl’ for queue ‘ca_auth’ in vhost ‘/ca-auth’: received ‘30000’ but current is ‘30000’, class-id=50, method-id=10) |

    | ERROR | 2020-02-04 21:25:08 | SimpleMessageListenerContainer.java | redeclareElementsIfNecessary | 975 | Failed to check/redeclare auto-delete queue(s). |

    attachments-2020-03-PPoIKAzC5e5f774182374.png

    attachments-2020-03-SiDnbmyM5e5f774aee215.png


    spring版本:3.2.11.RELEASE

    spring-rabbit版本:1.4.6.RELEASE


    rabbitMQ配置文件如下


    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="http://www.springframework.org/schema/beans

         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

         http://www.springframework.org/schema/rabbit

         http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

         

        <rabbit:connection-factory id="connectionFactory" virtual-host="/ca-auth" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true" publisher-returns="true"/>

        <!-- 通过指定下面的admin信息 -->

        <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />


        <!--定义queue -->

        <rabbit:queue name="ca_auth" durable="true" exclusive="false" declared-by="connectAdmin">

        <rabbit:queue-arguments>

      <entry key="x-message-ttl">

      <value>30000</value>

      </entry>

    <entry key="x-dead-letter-exchange">

    <value>dlx.file_to_file</value>

    </entry>

    <entry key="x-dead-letter-routing-key">

    <value>ca</value>

    </entry>

      </rabbit:queue-arguments>

        </rabbit:queue>

        <rabbit:queue name="dlx.ca_auth" durable="true" exclusive="false" declared-by="connectAdmin">

        <rabbit:queue-arguments>

      <entry key="x-message-ttl">

      <value>30000</value>

      </entry>

        </rabbit:queue-arguments>

        </rabbit:queue>


        <!--定义direct exchange -->

        <rabbit:direct-exchange name="file_to_file" durable="true" declared-by="connectAdmin">

            <rabbit:bindings>

                <rabbit:binding queue="ca_auth" key="ca"/>

            </rabbit:bindings>

        </rabbit:direct-exchange>

        

        <rabbit:direct-exchange name="dlx.file_to_file" durable="true" declared-by="connectAdmin">

            <rabbit:bindings>

                <rabbit:binding queue="dlx.ca_auth" key="ca"/>

            </rabbit:bindings>

        </rabbit:direct-exchange>

        

        <!--定义rabbit template -->

        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="file_to_file" confirm-callback="confirmCallBackListener" mandatory="true" return-callback="returnCallBackListener" retry-template="retryTemplate"/>


        <!--定义receiver -->

        <bean id="messageReceiver" class="com.cnpany.common.fadada.service.impl.AsynHandler"></bean>

        <!--定义listener -->

        <rabbit:listener-container connection-factory="connectionFactory" requeue-rejected="false">

            <rabbit:listener queues="ca_auth" ref="messageReceiver" />

        </rabbit:listener-container>

        

        <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">

          <property name="retryPolicy">

             <bean class="org.springframework.retry.policy.SimpleRetryPolicy">

                <property name="maxAttempts" value="3"/>

             </bean>

          </property>

       </bean>

    </beans>

    最终解决

    根据青山老师给出的提示,具体解决如下

    1、当在RabbitMQ后台管理界面去对Queue新增x-message-ttl时需要选择取值类型,此时取值不能选择String,否则提示属性可接受类型异常。注意:必须选择Number。

    attachments-2020-03-rNIJCa0W5e5f7756196a2.png

    attachments-2020-03-5i4BHmjI5e5f775d8e383.png

    2、由此可以想到,xml配置文件中的x-message-ttl必须是数值类型。再由xsd配置规则,value标签默认取值类型为String,需要指定类型完成类型转换。

    配置文件中关于x-message-ttl的配置需要增加type=“java.lang.Long”


    <rabbit:queue-arguments>

    <entry key="x-message-ttl">

    <value type="java.lang.Long">30000</value>

    </entry>

    </rabbit:queue-arguments>

  • 1 未解决
    0
    java web项目(基于servlet原生web项目),怎么集成rocketMQ消息中间件!

    1.集成rocketMQ,引入哪些jar包
    2.web项目创建producer者类和消费者类
    3.web项目启动了,生产者和消费者如何启动,是不是在web容器初始化启动生产者和消费者,还是在调用的时候?
    4.如果在web容器初始化,如何弄?
    5.希望有代码解决?

  • 1 未解决
    0
    kafka集群partition如何设置

    请问kafka集群,我有5个consumer,三个broker,如何设置partition数量,吞吐量比较高呢?

  • 1 未解决
    0
    kafka producer发送消息问题

    kafka producer 发送消息,如何保证发送成功,有且只有一次?比如:发送之后,brocker接收到了,但是返回没返回来,producer超时发送重复消息怎么解决?

  • 1 未解决
    0
    kafka如何快速定位topic的offset

    consumer_offset可以存partition的offset,当我消费完成之后会往这个consumer_offset加一条数据,那么,如果此时我重连回来,kafka怎么找到topic对应的offset给我,毕竟一个sonsumer_offset的partition数据那么多。

    就是说,当出现了新的消费者接入的时候,他是怎样从这个consumer_offset_里面去找到对应的topic和partition,然后找到对应的offset出来。



  • 1 未解决
    0
    RabbitMQ重试机制没生效

    问题描述

    希望在消费端消费出现异常时进行3次重试,当重试3次不成功后再将消息转发到死信队列


    问题出现的环境背景及自己尝试过哪些方法

    SpringMVC项目整合RabbitMQ

    spring版本:3.2.11.RELEASE

    spring-rabbit版本:1.4.6.RELEASE

    在xml配置文件中增加了RetryTemplate相关配置,并在消费端的onMessage()方法中模拟抛出异常来验证重新效果。经实际测试,异常抛出了一次,消息转发到了死信队列,重新效果没在日志中看到


    相关代码

    xml配置文件


    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="http://www.springframework.org/schema/beans

         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

         http://www.springframework.org/schema/rabbit

         http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

         

    <!--配置connection-factory,指定连接rabbit server参数 

        <rabbit:connection-factory id="connectionFactory" virtual-host="/ca-auth" username="${rabbitmq_username}" password="${rabbitmq_pwd}" host="${rabbitmq_host}" port="${rabbitmq_port}" />

        -->

        <!-- 172.31.20.2 -->

        <rabbit:connection-factory id="connectionFactory" virtual-host="/ca-auth" username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true" publisher-returns="true"/>

        <!-- 通过指定下面的admin信息 -->

        <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />


        <!--定义queue -->

        <rabbit:queue name="ca_auth" durable="true" exclusive="false" declared-by="connectAdmin">

        <rabbit:queue-arguments>

      <entry key="x-message-ttl">

      <value type="java.lang.Long">30000</value>

      </entry>

    <entry key="x-dead-letter-exchange">

    <value>dlx.file_to_file</value>

    </entry>

    <entry key="x-dead-letter-routing-key">

    <value>ca</value>

    </entry>

      </rabbit:queue-arguments>

        </rabbit:queue>

        <rabbit:queue name="dlx.ca_auth" durable="true" exclusive="false" declared-by="connectAdmin">

        <rabbit:queue-arguments>

      <entry key="x-message-ttl">

      <value type="java.lang.Long">30000</value>

      </entry>

        </rabbit:queue-arguments>

        </rabbit:queue>


        <!--定义direct exchange -->

        <rabbit:direct-exchange name="file_to_file" durable="true" declared-by="connectAdmin">

            <rabbit:bindings>

                <rabbit:binding queue="ca_auth" key="ca"/>

            </rabbit:bindings>

        </rabbit:direct-exchange>

        

        <rabbit:direct-exchange name="dlx.file_to_file" durable="true" declared-by="connectAdmin">

            <rabbit:bindings>

                <rabbit:binding queue="dlx.ca_auth" key="ca"/>

            </rabbit:bindings>

        </rabbit:direct-exchange>

        

        <!--定义rabbit template -->

        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="file_to_file" confirm-callback="confirmCallBackListener" mandatory="true" return-callback="returnCallBackListener" retry-template="retryTemplate"/>

    <!--  retry-template="retryTemplate"  -->

        <!--定义receiver -->

        <bean id="messageReceiver" class="com.cnpany.common.fadada.service.impl.AsynHandler"></bean>

        <!--定义listener -->

        <rabbit:listener-container connection-factory="connectionFactory" requeue-rejected="false">

            <rabbit:listener queues="ca_auth" ref="messageReceiver" />

        </rabbit:listener-container>

        

        <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">

          <property name="retryPolicy">

             <bean class="org.springframework.retry.policy.SimpleRetryPolicy">

                <property name="maxAttempts" value="3"/>

             </bean>

          </property>

       </bean>

    </beans>

    AsynHandler类的onMessage方法模拟异常代码


    public void onMessage(Message message) {

    this.logger.info("====MQ异步处理方法开始====");

    String msg = new String(message.getBody());

    int i = 0;

    System.out.println(10/i);

    }

    日志输出截图

    仅有一次异常记录,如果重新3次,是不是应该有3次


    attachments-2020-02-g7gKp8YG5e4ba1a36ac54.png

    RabbitMQ后台管理截图

    在死信队列中看到了转来的死信

    attachments-2020-02-lTwrRo3J5e4ba1ae0c85e.png


    你期待的结果是什么?实际看到的错误信息又是什么?

    请问为什么重新3次的效果没有看到,如果想要实现重新3次怎么实现

  • 1 未解决
    0
    java web项目(基于servlet原生web项目),怎么集成rocketMQ消息中间件!

    1.集成rocketMQ,引入哪些jar包

    2.web项目创建producer者类和消费者类

    3.web项目启动了,生产者和消费者如何启动,是不是在web容器初始化启动生产者和消费者,还是在调用的时候?

    4.如果在web容器初始化,如何弄?

    5.希望有代码解决?

  • 1 未解决
    0
    虽有rocketMQ发送消息的工具类?

    虽有rocketMQ发送消息的工具类?

    另外最好有对多客户端消费,保证唯一消费的解决方案

    求要有注释的,详细点的那种

  • 1 未解决
    0
    RocketMQ如何保证消息传输的可靠性

    像RabbitMQ保证消息的可靠性是服务端使用confirm异步回调,消费端使用手动ack。RocketMQ是相似的么?还有RabbitMQ和RocketMQ两者选型的话如何选择

  • 1 未解决
    0
    rocketMq 启动后 无法创建主题

    rocketMQ producer 执行超时

    demo运行producer时报错sendDefaultImpl call timeout

    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).

    RocketMQLog:WARN Please initialize the logger system properly.

    Exception in thread “main” org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:640)

    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)

    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)

    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)

    at com.study.demo.rocketmq.RocketProducer.main(RocketProducer.java:18)


    服务器是虚拟机,防火墙关闭。


  • 1 未解决
    0
    kafka如何快速定位topic的offset

    consumer_offset可以存partition的offset,当我消费完成之后会往这个consumer_offset加一条数据,那么,如果此时我重连回来,kafka怎么找到topic对应的offset给我,毕竟一个sonsumer_offset的partition数据那么多。

    就是说,当出现了新的消费者接入的时候,他是怎样从这个consumer_offset_里面去找到对应的topic和partition,然后找到对应的offset出来。