技术成就梦想

Springboot集成——阿里RocketMQ使用心得 原 荐 Springboot集成——阿里RocketMQ使用心得 HenryZhou2



一、阿里云官网—帮助文档

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)

二、代码

1、配置:

public class MqConfig {
    /**
     * 启动测试之前请替换如下 XXX 为您的配置
     */
    public static final String PUBLIC_TOPIC = "test";//公网测试
    public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";
    public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";

    public static final String ACCESS_KEY = "123";
    public static final String SECRET_KEY = "123";
    public static final String TAG = "";
    public static final String THREAD_NUM = "25";//消费端线程数
    /**
     * ONSADDR 请根据不同Region进行配置
     * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
     * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
     * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
     * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
     */
    public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";
}

阿里云用 公有云生成,测试用公网

2、生产者

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
    <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"
          init-method="start" destroy-method="shutdown">
        <property name="properties">
            <map>
                <entry key="ProducerId" value="" /> <!-- PID,请替换 -->
                <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,请替换 -->
                <entry key="SecretKey" value="" /> <!-- SECRET_KEY,请替换 -->
                <!--PropertyKeyConst.ONSAddr 请根据不同Region进行配置
                 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
                 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
                 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
                 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->
                <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>
            </map>
        </property>
    </bean>
</beans>

启动方式,在使用类的全局里设置:

//初始化生产者
    private ApplicationContext ctx;
    private ProducerBean producer;

    @Value("${producerConfig.enabled}")//开关,spring配置项,true为开启,false关闭
    private boolean producerConfigEnabled;

    @PostConstruct
    public void init(){
        if (true == producerConfigEnabled) {
            ctx = new ClassPathXmlApplicationContext("producer.xml");
            producer = (ProducerBean) ctx.getBean("producer");
        }
    }

发送代码:

 try {
      String jsonC = JsonUtils.toJson(elevenMessage);
      Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());
      SendResult sendResult = producer.send(message);
      if (sendResult != null) {
          logger.info(".Send mq message success!”;

      } else {
          logger.warn(".sendResult is null.........");
      }
      } catch (Exception e) {
           logger.warn("DoubleElevenAllPreService");
           Thread.sleep(1000);//如果有异常,休眠1秒
      }

发送消息的代码一点要捕获异常,不然会重复发送。

这里的TOPIC用自己创建的,elevenMessage是自己要发送的内容

3、消费者

配置启动类:

@Configuration
@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)
public class ConsumerConfig {

    private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());

    @Bean
    public Consumer consumerFactory(){
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);
        consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);
        consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);
        //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);
        consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);
        Consumer consumer = ONSFactory.createConsumer(consumerProperties);
        consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());
        consumer.start();
        logger.info("ConsumerConfig start success.");
        

        return consumer;

    }
}

CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置

创建监听器类,消费消息:

@Component
public class MessageListener implements MessageListener {
    private Logger logger = LoggerFactory.getLogger("remind");

    protected static ElevenReposity elevenReposity;
    @Resource
    public void setElevenReposity(ElevenReposity elevenReposity){
        MessageListener .elevenReposity=elevenReposity;
    }


    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {

        if(message.getTopic().equals("自己的TOPIC")){//避免消费到其他消息  json转换报错
            try {

            byte[] body = message.getBody();
            String res = new String(body);
            
            //res 是生产者传过来的消息内容

                //业务代码

            }else{
                logger.warn("!");
            }

            } catch (Exception e) {
                logger.error("MessageListener.consume error:" + e.getMessage(), e);
            }

            logger.info("MessageListener.Receive message”);
            //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
            return Action.CommitMessage;
        }else{
            logger.warn();
            return Action.ReconsumeLater;
        }

    }

注意,由于消费者是多线程的,所以对象要用static+set注入,并且无法调用父类的方法和变量

消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等

重置消费位点可以清空所有消息