<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.2.2</version>
</dependency>

监听模式-(不建议,个别异常消费者topic被踢出消费者组导致收不到消息)

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.glodon.scm.core.dao.BaseDAO;
import com.glodon.scm.core.exception.GjcBusinessException;
import com.glodon.scm.srm.inter.core.IItfHandler;
import com.glodon.scm.srm.inter.core.ItfConf;
import com.glodon.scm.srm.inter.kingdee.utils.KingDeeUtils;
import com.glodon.scm.srm.util.OutorInLogUtil;
import com.glodon.scm.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
@Component
public class MQConsumerListener implements MessageListenerOrderly {

    @Autowired
    private xxxxDao xxxxdao;
    
    @PostConstruct
    public void init() {
        try {
            log.info("========================MQ: 开始初始化  ===============================");
                        //查询配置
            JSONObject extdata = mapper.getId();
            RocketMQProperties rocketMQProperties = new RocketMQProperties();
            rocketMQProperties.setNameServer(config.getString("namesrvAddr"));
            rocketMQProperties.setAccessChannel(AccessChannel.CLOUD.name());
            RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
            consumerConfig.setGroup(config.getString("consumerGroup"));
            consumerConfig.setTopic(config.getString("topic"));
            consumerConfig.setMessageModel(consumerConfig.getMessageModel());
            consumerConfig.setSelectorType(consumerConfig.getSelectorType());
            consumerConfig.setSelectorExpression(config.getString("tags"));
            consumerConfig.setAccessKey(config.getString("accessKey"));
            consumerConfig.setSecretKey(config.getString("secretKey"));
            consumerConfig.setTlsEnable(consumerConfig.isTlsEnable());
            consumerConfig.setEnableMsgTrace(true);
            startDefaultMQPushConsumer(rocketMQProperties);
            log.info("========================ZTEJ:MQ: 初始化完成===============================");
        } catch (Exception e) {
            log.error("初始化mq失败!", e);
        }
    }

    private void startDefaultMQPushConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
        String accessChannel = rocketMQProperties.getAccessChannel();
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        boolean useTLS = consumerConfig.isTlsEnable();
        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), consumerConfig.isEnableMsgTrace(), "RMQ_SYS_TRACE_TOPIC");
        consumer.setVipChannelEnabled(false);
        consumer.setNamespace(consumerConfig.getNamespace());
        consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
        consumer.setNamesrvAddr(nameServer);
        consumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
        consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
        consumer.subscribe(topicName, selectorExpression);
        consumer.setMessageListener(this);
        consumer.setUseTLS(useTLS);
        consumer.setSuspendCurrentQueueTimeMillis(10000); //消费失败重试时间间隔,设置每隔10秒重复推送一次
        consumer.start();
    }
    
    public void onMessage(MessageExt messageExt, Map<String, Object> tagMap) throws Exception {
        log.info("========================MQ: 接受到消息 {}===============================", JSON.toJSONString(messageExt));
        String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
        //todo 处理逻辑 
        log.info("========================ZTEJ:MQ:消息处理完成 ===============================");
    }

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for (MessageExt messageExt : list) {
            String belongid = KingDeeUtils.getUUid();
            String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
            String tagName = "mq消息接收";
            String tags = messageExt.getTags();
            String conf_sql = "select handler from inter_conf where id =xxx limit 1 ";
            Map<String, Object> tagMap = basedao.query(conf_sql, MapUtil.of("datatype", tags));
            try {
                if (tagMap == null || tagMap.isEmpty()) {
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                log.info("========================MQ:查询Handler配置tags<=> {}===============================", messageExt.getTags());
                tagName = StrUtil.toStringOrNull(tagMap.get("confname"));
                onMessage(messageExt, tagMap);
            } catch (Exception e) {
                StringBuilder sb = new StringBuilder();
                sb.append(e.getMessage()).append("\n");
                for (StackTraceElement stackTraceElement : e.getStackTrace()) {
                    sb.append(stackTraceElement.toString()).append("\n");
                }
                log.error("消费者处理失败:{}", sb);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}


2. 手动拉取模式(建议,手动拉取,频次数量易控-简单(工作不是炫技最简单高效才是王道))

 @PostConstruct
    public void init() {
        try {
            log.info("========================MQ: 开始初始化  ===============================");
            //查询配置
            JSONObject extdata = mapper.getId();
            RocketMQProperties rocketMQProperties = new RocketMQProperties();
            rocketMQProperties.setNameServer(extdata.getString("namesrvAddr"));
            rocketMQProperties.setAccessChannel(AccessChannel.CLOUD.name());
            RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
            consumerConfig.setGroup(extdata.getString("consumerGroup"));
            consumerConfig.setTopic(extdata.getString("topic"));
            consumerConfig.setMessageModel(consumerConfig.getMessageModel());
            consumerConfig.setSelectorType(consumerConfig.getSelectorType());
            consumerConfig.setSelectorExpression(extdata.getString("tags"));
            consumerConfig.setAccessKey(extdata.getString("accessKey"));
            consumerConfig.setSecretKey(extdata.getString("secretKey"));
            consumerConfig.setTlsEnable(consumerConfig.isTlsEnable());
            consumerConfig.setEnableMsgTrace(true);
            startDefaultMQPullConsumer(rocketMQProperties);
            log.info("========================MQ: 初始化完成===============================");
        } catch (Exception e) {
            log.error("初始化mq失败!", e);
        }
    }

    private void startDefaultMQPushConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
        String accessChannel = rocketMQProperties.getAccessChannel();
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        boolean useTLS = consumerConfig.isTlsEnable();
        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), consumerConfig.isEnableMsgTrace(), "RMQ_SYS_TRACE_TOPIC");
        consumer.setVipChannelEnabled(false);
        consumer.setNamespace(consumerConfig.getNamespace());
        consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
        consumer.setNamesrvAddr(nameServer);
        consumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
        consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
        consumer.subscribe(topicName, selectorExpression);
        consumer.setMessageListener(this);
        consumer.setUseTLS(useTLS);
        consumer.setSuspendCurrentQueueTimeMillis(3000 * 60); //消费失败重试时间间隔,设置每隔3分钟重复推送一次
        consumer.start();
    }

    @SneakyThrows
    public void startDefaultMQPullConsumer(RocketMQProperties rocketMQProperties) {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        String expression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(groupName, rpcHook);
        DefaultMQPullConsumer consumer = scheduleService.getDefaultMQPullConsumer();
        consumer.setNamesrvAddr(nameServer);
        scheduleService.setMessageModel(MessageModel.BROADCASTING);
        scheduleService.registerPullTaskCallback(topicName, new PullTaskCallbackImpl(expression));
        scheduleService.start();
    }

    public class PullTaskCallbackImpl implements PullTaskCallback {
        private final String expression;

        public PullTaskCallbackImpl(String expression) {
            this.expression = expression;
        }

        @Override
        public void doPullTask(MessageQueue mq, PullTaskContext context) {
            MQPullConsumer consumer = context.getPullConsumer();
            try {
                long offset = consumer.fetchConsumeOffset(mq, false);
                if (offset < 0) {
                    offset = 0;
                }
                PullResult pullResult = consumer.pull(mq, expression, offset, 32);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for (MessageExt msg : list) {
                            doWork(msg);
                        }
                        break;
                    case NO_MATCHED_MSG:
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                    default:
                        break;
                }
                consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                context.setPullNextDelayTimeMillis(6000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void doWork(MessageExt messageExt) {
        String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
        String tags = messageExt.getTags();
        try {
            log.info("========================MQ:查询Handler配置tags<=> {}===============================", messageExt.getTags());
            // todo 业务逻辑。
        }
    }