Commit 472ca328 by 鲁强

Merge branch 'dev' into 'master'

Dev

Closes #11

See merge request mall/arch/matrix!11
parents bc0f2900 d2bf0929
......@@ -5,6 +5,7 @@ import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -17,15 +18,23 @@ public class MatrixListenerContainerConfiguration implements BeanPostProcessor {
private final static Logger log = LoggerFactory.getLogger(MatrixListenerContainerConfiguration.class);
@Value("${matrix.rocketmq.consumer.delayLevel:-1}")
private int delayLevel;
@Bean
MatrixConsumeHook matrixConsumeHook(){
MatrixConsumeHook matrixConsumeHook() {
return new MatrixConsumeHook();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)bean;
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
if (log.isDebugEnabled()) {
log.debug("MatrixListenerContainerConfiguration config setDelayLevelWhenNextConsume value:{}" , delayLevel);
}
container.setDelayLevelWhenNextConsume(delayLevel);
container.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(matrixConsumeHook());
}
return bean;
......
package com.secoo.mall.mq.config;
import com.secoo.mall.mq.hook.MatrixProducerHook;
import com.secoo.mall.mq.producer.MartixProducer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author qianglu
*/
@Configuration
@Import(MatrixListenerContainerConfiguration.class)
public class MatrixRocketMQAutoConfiguration {
@Bean
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MartixProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
MatrixProducerHook matrixProducerHook = new MatrixProducerHook();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new MartixProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
producer.setVipChannelEnabled(false);
} else {
producer = new MartixProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
}
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
}
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.secoo.mall.mq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment