Commit 1afb95db by QIANGLU

恢复mq功能

parent cbf6569f
......@@ -26,6 +26,7 @@ public class MatrixListenerContainerConfiguration implements BeanPostProcessor {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)bean;
container.setDelayLevelWhenNextConsume(2);
container.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(matrixConsumeHook());
}
return bean;
......
package com.secoo.mall.mq.config;
import com.secoo.mall.mq.hook.MatrixProducerHook;
import com.secoo.mall.mq.producer.MartixProducer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author qianglu
*/
@Configuration
@Import(MatrixListenerContainerConfiguration.class)
public class MatrixRocketMQAutoConfiguration {
@Bean
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MartixProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
MatrixProducerHook matrixProducerHook = new MatrixProducerHook();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new MartixProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
producer.setVipChannelEnabled(false);
} else {
producer = new MartixProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
}
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
}
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.secoo.mall.mq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment