RocketMQ消费者动态配置启动

书中人 2024年08月16日 147次浏览
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.2.2</version>
</dependency>
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.glodon.scm.core.dao.BaseDAO;
import com.glodon.scm.core.exception.GjcBusinessException;
import com.glodon.scm.srm.inter.core.IItfHandler;
import com.glodon.scm.srm.inter.core.ItfConf;
import com.glodon.scm.srm.inter.kingdee.utils.KingDeeUtils;
import com.glodon.scm.srm.util.OutorInLogUtil;
import com.glodon.scm.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
@Component
public class MQConsumerListener implements MessageListenerOrderly {

    @Autowired
    private xxxxDao xxxxdao;
    
    @PostConstruct
    public void init() {
        try {
            log.info("========================MQ: 开始初始化  ===============================");
            String sql = "SELECT config FROM xxxx WHERE id=:id";
            Map<String, Object> mqconfig = xxxxdao.query(sql, "mqconfig");
            if (jobmap == null || !JSONUtil.isTypeJSON(mqconfig.get("extdata")+"")) {
                return;
            }
            log.info("========================MQ: 查询配置 {}===============================", mqconfig);
            JSONObject extdata = JSON.parseObject(mqconfig.get("config")+"");
            RocketMQProperties rocketMQProperties = new RocketMQProperties();
            rocketMQProperties.setNameServer(config.getString("namesrvAddr"));
            rocketMQProperties.setAccessChannel(AccessChannel.CLOUD.name());
            RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
            consumerConfig.setGroup(config.getString("consumerGroup"));
            consumerConfig.setTopic(config.getString("topic"));
            consumerConfig.setMessageModel(consumerConfig.getMessageModel());
            consumerConfig.setSelectorType(consumerConfig.getSelectorType());
            consumerConfig.setSelectorExpression(config.getString("tags"));
            consumerConfig.setAccessKey(config.getString("accessKey"));
            consumerConfig.setSecretKey(config.getString("secretKey"));
            consumerConfig.setTlsEnable(consumerConfig.isTlsEnable());
            consumerConfig.setEnableMsgTrace(true);
            startDefaultMQPushConsumer(rocketMQProperties);
            log.info("========================ZTEJ:MQ: 初始化完成===============================");
        } catch (Exception e) {
            log.error("初始化mq失败!", e);
        }
    }

    private void startDefaultMQPushConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
        String accessChannel = rocketMQProperties.getAccessChannel();
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        boolean useTLS = consumerConfig.isTlsEnable();
        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), consumerConfig.isEnableMsgTrace(), "RMQ_SYS_TRACE_TOPIC");
        consumer.setVipChannelEnabled(false);
        consumer.setNamespace(consumerConfig.getNamespace());
        consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
        consumer.setNamesrvAddr(nameServer);
        consumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
        consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
        consumer.subscribe(topicName, selectorExpression);
        consumer.setMessageListener(this);
        consumer.setUseTLS(useTLS);
        consumer.setSuspendCurrentQueueTimeMillis(10000); //消费失败重试时间间隔,设置每隔10秒重复推送一次
        consumer.start();
    }
    
    public void onMessage(MessageExt messageExt, Map<String, Object> tagMap) throws Exception {
        log.info("========================MQ: 接受到消息 {}===============================", JSON.toJSONString(messageExt));
        String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
        //todo 处理逻辑 
        log.info("========================ZTEJ:MQ:消息处理完成 ===============================");
    }

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for (MessageExt messageExt : list) {
            String belongid = KingDeeUtils.getUUid();
            String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
            String tagName = "mq消息接收";
            String tags = messageExt.getTags();
            String conf_sql = "select handler from inter_conf where id =xxx limit 1 ";
            Map<String, Object> tagMap = basedao.query(conf_sql, MapUtil.of("datatype", tags));
            try {
                if (tagMap == null || tagMap.isEmpty()) {
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                log.info("========================MQ:查询Handler配置tags<=> {}===============================", messageExt.getTags());
                tagName = StrUtil.toStringOrNull(tagMap.get("confname"));
                onMessage(messageExt, tagMap);
            } catch (Exception e) {
                StringBuilder sb = new StringBuilder();
                sb.append(e.getMessage()).append("\n");
                for (StackTraceElement stackTraceElement : e.getStackTrace()) {
                    sb.append(stackTraceElement.toString()).append("\n");
                }
                log.error("消费者处理失败:{}", sb);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}