<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
监听模式-(不建议,个别异常消费者topic被踢出消费者组导致收不到消息)
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.glodon.scm.core.dao.BaseDAO;
import com.glodon.scm.core.exception.GjcBusinessException;
import com.glodon.scm.srm.inter.core.IItfHandler;
import com.glodon.scm.srm.inter.core.ItfConf;
import com.glodon.scm.srm.inter.kingdee.utils.KingDeeUtils;
import com.glodon.scm.srm.util.OutorInLogUtil;
import com.glodon.scm.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Component
public class MQConsumerListener implements MessageListenerOrderly {
@Autowired
private xxxxDao xxxxdao;
@PostConstruct
public void init() {
try {
log.info("========================MQ: 开始初始化 ===============================");
//查询配置
JSONObject extdata = mapper.getId();
RocketMQProperties rocketMQProperties = new RocketMQProperties();
rocketMQProperties.setNameServer(config.getString("namesrvAddr"));
rocketMQProperties.setAccessChannel(AccessChannel.CLOUD.name());
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
consumerConfig.setGroup(config.getString("consumerGroup"));
consumerConfig.setTopic(config.getString("topic"));
consumerConfig.setMessageModel(consumerConfig.getMessageModel());
consumerConfig.setSelectorType(consumerConfig.getSelectorType());
consumerConfig.setSelectorExpression(config.getString("tags"));
consumerConfig.setAccessKey(config.getString("accessKey"));
consumerConfig.setSecretKey(config.getString("secretKey"));
consumerConfig.setTlsEnable(consumerConfig.isTlsEnable());
consumerConfig.setEnableMsgTrace(true);
startDefaultMQPushConsumer(rocketMQProperties);
log.info("========================ZTEJ:MQ: 初始化完成===============================");
} catch (Exception e) {
log.error("初始化mq失败!", e);
}
}
private void startDefaultMQPushConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
boolean useTLS = consumerConfig.isTlsEnable();
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), consumerConfig.isEnableMsgTrace(), "RMQ_SYS_TRACE_TOPIC");
consumer.setVipChannelEnabled(false);
consumer.setNamespace(consumerConfig.getNamespace());
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
consumer.setNamesrvAddr(nameServer);
consumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
consumer.subscribe(topicName, selectorExpression);
consumer.setMessageListener(this);
consumer.setUseTLS(useTLS);
consumer.setSuspendCurrentQueueTimeMillis(10000); //消费失败重试时间间隔,设置每隔10秒重复推送一次
consumer.start();
}
public void onMessage(MessageExt messageExt, Map<String, Object> tagMap) throws Exception {
log.info("========================MQ: 接受到消息 {}===============================", JSON.toJSONString(messageExt));
String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
//todo 处理逻辑
log.info("========================ZTEJ:MQ:消息处理完成 ===============================");
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
String belongid = KingDeeUtils.getUUid();
String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
String tagName = "mq消息接收";
String tags = messageExt.getTags();
String conf_sql = "select handler from inter_conf where id =xxx limit 1 ";
Map<String, Object> tagMap = basedao.query(conf_sql, MapUtil.of("datatype", tags));
try {
if (tagMap == null || tagMap.isEmpty()) {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
log.info("========================MQ:查询Handler配置tags<=> {}===============================", messageExt.getTags());
tagName = StrUtil.toStringOrNull(tagMap.get("confname"));
onMessage(messageExt, tagMap);
} catch (Exception e) {
StringBuilder sb = new StringBuilder();
sb.append(e.getMessage()).append("\n");
for (StackTraceElement stackTraceElement : e.getStackTrace()) {
sb.append(stackTraceElement.toString()).append("\n");
}
log.error("消费者处理失败:{}", sb);
return ConsumeOrderlyStatus.SUCCESS;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
2. 手动拉取模式(建议,手动拉取,频次数量易控-简单(工作不是炫技最简单高效才是王道))
@PostConstruct
public void init() {
try {
log.info("========================MQ: 开始初始化 ===============================");
//查询配置
JSONObject extdata = mapper.getId();
RocketMQProperties rocketMQProperties = new RocketMQProperties();
rocketMQProperties.setNameServer(extdata.getString("namesrvAddr"));
rocketMQProperties.setAccessChannel(AccessChannel.CLOUD.name());
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
consumerConfig.setGroup(extdata.getString("consumerGroup"));
consumerConfig.setTopic(extdata.getString("topic"));
consumerConfig.setMessageModel(consumerConfig.getMessageModel());
consumerConfig.setSelectorType(consumerConfig.getSelectorType());
consumerConfig.setSelectorExpression(extdata.getString("tags"));
consumerConfig.setAccessKey(extdata.getString("accessKey"));
consumerConfig.setSecretKey(extdata.getString("secretKey"));
consumerConfig.setTlsEnable(consumerConfig.isTlsEnable());
consumerConfig.setEnableMsgTrace(true);
startDefaultMQPullConsumer(rocketMQProperties);
log.info("========================MQ: 初始化完成===============================");
} catch (Exception e) {
log.error("初始化mq失败!", e);
}
}
private void startDefaultMQPushConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
boolean useTLS = consumerConfig.isTlsEnable();
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), consumerConfig.isEnableMsgTrace(), "RMQ_SYS_TRACE_TOPIC");
consumer.setVipChannelEnabled(false);
consumer.setNamespace(consumerConfig.getNamespace());
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
consumer.setNamesrvAddr(nameServer);
consumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
consumer.subscribe(topicName, selectorExpression);
consumer.setMessageListener(this);
consumer.setUseTLS(useTLS);
consumer.setSuspendCurrentQueueTimeMillis(3000 * 60); //消费失败重试时间间隔,设置每隔3分钟重复推送一次
consumer.start();
}
@SneakyThrows
public void startDefaultMQPullConsumer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
String expression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(groupName, rpcHook);
DefaultMQPullConsumer consumer = scheduleService.getDefaultMQPullConsumer();
consumer.setNamesrvAddr(nameServer);
scheduleService.setMessageModel(MessageModel.BROADCASTING);
scheduleService.registerPullTaskCallback(topicName, new PullTaskCallbackImpl(expression));
scheduleService.start();
}
public class PullTaskCallbackImpl implements PullTaskCallback {
private final String expression;
public PullTaskCallbackImpl(String expression) {
this.expression = expression;
}
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0) {
offset = 0;
}
PullResult pullResult = consumer.pull(mq, expression, offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for (MessageExt msg : list) {
doWork(msg);
}
break;
case NO_MATCHED_MSG:
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
context.setPullNextDelayTimeMillis(6000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void doWork(MessageExt messageExt) {
String body = StrUtil.str(messageExt.getBody(), Charsets.UTF_8);
String tags = messageExt.getTags();
try {
log.info("========================MQ:查询Handler配置tags<=> {}===============================", messageExt.getTags());
// todo 业务逻辑。
}
}