Commit 8e2a4e65 by QIANGLU

添加mq发生接收监控

parent 39629064
package com.secoo.mall.mq.config;
import com.secoo.mall.mq.hook.MatrixConsumeHook;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author qianglu
*/
@Configuration
public class MatrixListenerContainerConfiguration implements BeanPostProcessor {
private final static Logger log = LoggerFactory.getLogger(MatrixListenerContainerConfiguration.class);
@Bean
MatrixConsumeHook matrixConsumeHook(){
return new MatrixConsumeHook();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DefaultRocketMQListenerContainer){
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)bean;
container.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(matrixConsumeHook());
}
return bean;
}
}
package com.secoo.mall.mq.config; package com.secoo.mall.mq.config;
import com.ctrip.framework.apollo.spring.boot.ApolloAutoConfiguration; import com.ctrip.framework.apollo.spring.boot.ApolloAutoConfiguration;
import com.secoo.mall.mq.hook.MatrixProducerHook;
import com.secoo.mall.mq.producer.MartixProducer;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.AccessChannel;
...@@ -9,6 +11,7 @@ import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; ...@@ -9,6 +11,7 @@ import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
...@@ -16,7 +19,8 @@ import org.springframework.util.StringUtils; ...@@ -16,7 +19,8 @@ import org.springframework.util.StringUtils;
* @author qianglu * @author qianglu
*/ */
@Configuration @Configuration
public class MatrixRocketMQAutoonfiguration { @Import(MatrixListenerContainerConfiguration.class)
public class MatrixRocketMQAutoConfiguration {
...@@ -37,19 +41,19 @@ public class MatrixRocketMQAutoonfiguration { ...@@ -37,19 +41,19 @@ public class MatrixRocketMQAutoonfiguration {
String accessChannel = rocketMQProperties.getAccessChannel(); String accessChannel = rocketMQProperties.getAccessChannel();
DefaultMQProducer producer; MartixProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey(); String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey(); String sk = rocketMQProperties.getProducer().getSecretKey();
MatrixProducerHook matrixProducerHook = new MatrixProducerHook();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)), producer = new MartixProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
rocketMQProperties.getProducer().isEnableMsgTrace(), rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic()); rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
producer.setVipChannelEnabled(false); producer.setVipChannelEnabled(false);
} else { } else {
producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(), producer = new MartixProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic()); rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
} }
producer.setNamesrvAddr(nameServer); producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) { if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
...@@ -60,8 +64,9 @@ public class MatrixRocketMQAutoonfiguration { ...@@ -60,8 +64,9 @@ public class MatrixRocketMQAutoonfiguration {
producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer; return producer;
} }
} }
package com.secoo.mall.mq.hook;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianglu
*/
public class MatrixConsumeHook implements ConsumeMessageHook {
private final static Logger log = LoggerFactory.getLogger(MatrixConsumeHook.class);
@Override
public String hookName() {
return "MatrixConsumeHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
try {
log.info("consumeMessageAfter,Succ:{} Topic:{},TraceContext:{},ConsumerGroup:{}",context.isSuccess(),context.getMq().getTopic(),context.getMqTraceContext(),context.getConsumerGroup());
} catch (Exception e) {//防灾冗余
}
}
}
package com.secoo.mall.mq.hook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianglu
*/
public class MatrixProducerHook implements SendMessageHook {
private final static Logger log = LoggerFactory.getLogger(MatrixProducerHook.class);
@Override
public String hookName() {
return "MatrixProducerHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
}
@Override
public void sendMessageAfter(SendMessageContext context) {
try {
log.info("sendMessageAfter,Mid:{},Mode:{},Topic:{},Targs:{},ProducerGroup:{},BrokerAddr:{},Namespace:{},Exception:{}", context.getMessage().getProperties().get("id"), context.getCommunicationMode().name(), context.getMessage().getTopic(), context.getMessage().getTags(), context.getProducerGroup(), context.getBrokerAddr(), context.getNamespace(), context.getException()==null?null:context.getException().getMessage());
} catch (Exception e) {//防灾冗余}
}
}
}
\ No newline at end of file
package com.secoo.mall.mq.producer;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author qianglu
*/
public class MartixProducer extends DefaultMQProducer {
public MartixProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic, SendMessageHook messageHook) {
super(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
this.defaultMQProducerImpl.registerSendMessageHook(messageHook);
}
public MartixProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic, SendMessageHook messageHook) {
super(producerGroup, enableMsgTrace, customizedTraceTopic);
this.defaultMQProducerImpl.registerSendMessageHook(messageHook);
}
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.secoo.mall.mq.config.MatrixRocketMQAutoonfiguration com.secoo.mall.mq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment