Commit e9e169a8 by qiuweili123

调整mq模块

parent 2eae2111
......@@ -3,55 +3,34 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix-mq</artifactId>
<artifactId>matrix-bus</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.3.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-rocketmq-starter</artifactId>
<artifactId>matrix-bus-canal-starter</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-starter</artifactId>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-core</artifactId>
<artifactId>matrix-mq-rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5-secoo1.2</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
</plugin>
</plugins>
</build>
</dependencies>
</project>
\ No newline at end of file
package com.secoo.mall.rocketmq.consumer;
package com.secoo.mall.canal.consumer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.secoo.mall.rocketmq.util.CanalMessageUtil;
import com.secoo.mall.canal.util.CanalMessageUtil;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
......@@ -20,7 +20,7 @@ public abstract class AbsCanalRocketMQConsumer implements RocketMQListener<Messa
public abstract void onCanalMessage(List<FlatMessage> flatMessages);
public List<FlatMessage> messageConverter(MessageExt messageExt) {
if(messageExt == null){
if (messageExt == null) {
return null;
}
......
package com.secoo.mall.rocketmq;
package com.secoo.mall.canal.consumer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.secoo.mall.rocketmq.util.CanalMessageUtil;
import com.secoo.mall.canal.util.CanalMessageUtil;
import com.secoo.mall.rocketmq.PushRocketMQConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
......@@ -16,19 +17,19 @@ public abstract class CanalPushRocketMQConsumer extends PushRocketMQConsumer {
private static Logger logger = LoggerFactory.getLogger(CanalPushRocketMQConsumer.class);
@Override
public ConsumeConcurrentlyStatus doService(List<MessageExt> msgs){
public ConsumeConcurrentlyStatus doService(List<MessageExt> msgs) {
return this.doCanalService(messageConverter(msgs));
}
public abstract ConsumeConcurrentlyStatus doCanalService(List<List<FlatMessage>> flatMessageLists);
public List<List<FlatMessage>> messageConverter(List<MessageExt> messageExts) {
if(CollectionUtils.isEmpty(messageExts)){
if (CollectionUtils.isEmpty(messageExts)) {
return null;
}
List<List<FlatMessage>> flatMessageLists = new ArrayList<List<FlatMessage>>();
for(MessageExt messageExt:messageExts){
for (MessageExt messageExt : messageExts) {
flatMessageLists.add(CanalMessageUtil.messageConverter(messageExt));
}
......
package com.secoo.mall.rocketmq.util;
package com.secoo.mall.canal.util;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.CanalEntry;
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.3.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-bus</artifactId>
<packaging>pom</packaging>
<modules>
<module>matrix-bus-canal-starter</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix-mq</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.3.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-mq-rocketmq-client</artifactId>
<dependencies>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>common-util</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.secoo.mall.rocketmq;
import com.secoo.mall.common.util.string.StringUtil;
import com.secoo.mall.rocketmq.util.RunTimeUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
......@@ -64,7 +64,7 @@ public abstract class PushRocketMQConsumer {
@Override
public void run() {
consumer[0] = new DefaultMQPushConsumer(consumerGroup,getAclRPCHook(),new AllocateMessageQueueAveragely());
consumer[0] = new DefaultMQPushConsumer(consumerGroup, getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer[0].setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
consumer[0].setNamesrvAddr(namesrvAddr);
try {
......@@ -197,8 +197,8 @@ public abstract class PushRocketMQConsumer {
}
public RPCHook getAclRPCHook() {
if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
return new AclClientRPCHook(new SessionCredentials(this.getAccessKey(),this.getSecretKey()));
if (StringUtil.isNotBlank(accessKey) && StringUtil.isNotBlank(secretKey)) {
return new AclClientRPCHook(new SessionCredentials(this.getAccessKey(), this.getSecretKey()));
}
return null;
......
......@@ -9,33 +9,15 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-rocketmq-core</artifactId>
<artifactId>matrix-mq-rocketmq-core</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5-secoo1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
<artifactId>rocketmq-client</artifactId>
</dependency>
</dependencies>
......
......@@ -19,7 +19,7 @@ matrix-rocketmq-starter
```xml
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-starter</artifactId>
<artifactId>matrix-mq-rocketmq-starter</artifactId>
</dependency>
```
......
......@@ -3,19 +3,18 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix</artifactId>
<artifactId>matrix-mq</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.3.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-starter</artifactId>
<artifactId>matrix-mq-rocketmq-starter</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
<version>2.1.0</version>
</dependency>
</dependencies>
<build>
......
package com.secoo.mall.rocketmq;
import com.secoo.mall.rocketmq.PushRocketMQProducer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Producer extends PushRocketMQProducer {
private static Logger logger = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) throws MQClientException,
InterruptedException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProducerGroupName需要由应用来保证唯一<br>
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
PushRocketMQProducer producer = new PushRocketMQProducer();
producer.setNamesrvAddr("localhost:9876");
producer.setTopic("TopicTest");
producer.setProducerGroup("test");
producer.init();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
for (int i = 0; i < 100; i++) {
try {
{
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println("第 "+i+" 数据"+sendResult.toString());
}
/*{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
logger.info(sendResult.toString());
}
{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
logger.info(sendResult.toString());
}*/
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
package com.secoo.mall.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class PushConsumer {
private static Logger logger = LoggerFactory.getLogger(Producer.class);
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException,
MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroupName");
consumer.setNamesrvAddr("10.0.253.3:9876;10.0.253.2:9876");
consumer.setInstanceName("Consumber");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
//consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 执行TagC的消费
} else if (msg.getTags() != null
&& msg.getTags().equals("TagD")) {
// 执行TagD的消费
}else {
System.out.println(new String(msg.getBody()));
}
} else if (msg.getTopic().equals("TopicTest2")) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
\ No newline at end of file
package com.secoo.mall.rocketmq.config;
import com.secoo.mall.rocketmq.hook.MatrixProducerHook;
import com.secoo.mall.rocketmq.producer.MartixProducer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author qianglu
*/
@Configuration
@Import(MatrixListenerContainerConfiguration.class)
public class MatrixRocketMQAutoConfiguration {
@Bean
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MartixProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
MatrixProducerHook matrixProducerHook = new MatrixProducerHook();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new MartixProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(), matrixProducerHook);
producer.setVipChannelEnabled(false);
} else {
producer = new MartixProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(), matrixProducerHook);
}
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
}
......@@ -14,17 +14,22 @@
<properties>
<rocketmq.version>4.6.0</rocketmq.version>
<spring.version>5.2.1.RELEASE</spring.version>
<canal.version>1.1.5-secoo1.2</canal.version>
</properties>
<modules>
<module>matrix-rocketmq-core</module>
<module>matrix-rocketmq-starter</module>
<module>matrix-mq-rocketmq-core</module>
<module>matrix-mq-rocketmq-starter</module>
<module>matrix-mq-rocketmq-client</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-core</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
......@@ -38,20 +43,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
<version>2.1.0</version>
</dependency>
</dependencies>
......
......@@ -24,9 +24,9 @@
<module>config-starter</module>
<module>matrix-protocol</module>
<module>matrix-datasource</module>
<module>rocketmq-starter</module>
<module>matrix-job</module>
<module>matrix-bigdata</module>
<module>matrix-bus</module>
</modules>
......@@ -104,17 +104,17 @@
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-core</artifactId>
<artifactId>matrix-mq-rocketmq-core</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>rocketmq-starter</artifactId>
<artifactId>matrix-mq-rocketmq-client</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-starter</artifactId>
<artifactId>matrix-mq-rocketmq-starter</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<dependency>
......@@ -165,6 +165,7 @@
</dependency>
<!--普通jar-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
......@@ -206,6 +207,7 @@
<version>2.10</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......
package com.secoo.mall.rocketmq.config;
import com.secoo.mall.rocketmq.hook.MatrixConsumeHook;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author qianglu
*/
@Configuration
public class MatrixListenerContainerConfiguration implements BeanPostProcessor {
private final static Logger log = LoggerFactory.getLogger(MatrixListenerContainerConfiguration.class);
@Value("${matrix.rocketmq.consumer.delayLevel:0}")
private int delayLevel;
@Bean
MatrixConsumeHook matrixConsumeHook() {
return new MatrixConsumeHook();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
if (log.isDebugEnabled()) {
log.debug("MatrixListenerContainerConfiguration config setDelayLevelWhenNextConsume value:{}", delayLevel);
}
container.setDelayLevelWhenNextConsume(delayLevel);
container.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(matrixConsumeHook());
}
return bean;
}
}
package com.secoo.mall.rocketmq.hook;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianglu
*/
public class MatrixConsumeHook implements ConsumeMessageHook {
private final static Logger log = LoggerFactory.getLogger(MatrixConsumeHook.class);
@Override
public String hookName() {
return "MatrixConsumeHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
try {
log.info("consumeMessageAfter,Succ:{} Topic:{},TraceContext:{},ConsumerGroup:{}", context.isSuccess(), context.getMq().getTopic(), context.getMqTraceContext(), context.getConsumerGroup());
} catch (Exception e) {//防灾冗余
}
}
}
package com.secoo.mall.rocketmq.hook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianglu
*/
public class MatrixProducerHook implements SendMessageHook {
private final static Logger log = LoggerFactory.getLogger(MatrixProducerHook.class);
@Override
public String hookName() {
return "MatrixProducerHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
}
@Override
public void sendMessageAfter(SendMessageContext context) {
try {
log.info("sendMessageAfter,TraceContext:{},Mode:{},Topic:{},Targs:{},ProducerGroup:{},BrokerAddr:{},Namespace:{},Exception:{}", context.getMqTraceContext(), context.getCommunicationMode().name(), context.getMessage().getTopic(), context.getMessage().getTags(), context.getProducerGroup(), context.getBrokerAddr(), context.getNamespace(), context.getException() == null ? null : context.getException().getMessage());
} catch (Exception e) {//防灾冗余}
}
}
}
\ No newline at end of file
package com.secoo.mall.rocketmq.producer;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author qianglu
*/
public class MartixProducer extends DefaultMQProducer {
public MartixProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic, SendMessageHook messageHook) {
super(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
this.defaultMQProducerImpl.registerSendMessageHook(messageHook);
}
public MartixProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic, SendMessageHook messageHook) {
super(producerGroup, enableMsgTrace, customizedTraceTopic);
this.defaultMQProducerImpl.registerSendMessageHook(messageHook);
}
}
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.secoo.mall.rocketmq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment