Commit 2f4b1d4b by qiuweili123

调整mq模块

parent ead49148
package com.secoo.mall.common.core.serializer;
public interface MatrixSerializer<T> {
byte[] serialize(T data);
T deserialize(byte[] data);
}
......@@ -46,6 +46,10 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
......@@ -55,6 +59,7 @@
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
......
package com.secoo.mall.common.serializer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.support.config.FastJsonConfig;
import com.secoo.mall.common.core.exception.SystemInternalException;
import com.secoo.mall.common.core.serializer.MatrixSerializer;
import lombok.Setter;
public class FastJsonSerializer<T> implements MatrixSerializer<T> {
@Setter
private FastJsonConfig fastJsonConfig = new FastJsonConfig();
private Class<T> type;
public FastJsonSerializer(Class<T> type) {
this.type = type;
}
@Override
public byte[] serialize(T t) {
if (t == null) {
return new byte[0];
}
try {
return JSON.toJSONBytes(
fastJsonConfig.getCharset(),
t,
fastJsonConfig.getSerializeConfig(),
fastJsonConfig.getSerializeFilters(),
fastJsonConfig.getDateFormat(),
JSON.DEFAULT_GENERATE_FEATURE,
fastJsonConfig.getSerializerFeatures()
);
} catch (Exception ex) {
throw new SystemInternalException();
}
}
@Override
public T deserialize(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
try {
return (T) JSON.parseObject(
bytes,
fastJsonConfig.getCharset(),
type,
fastJsonConfig.getParserConfig(),
fastJsonConfig.getParseProcess(),
JSON.DEFAULT_PARSER_FEATURE,
fastJsonConfig.getFeatures()
);
} catch (Exception ex) {
throw new SystemInternalException();
}
}
}
package com.secoo.mall.common.serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.secoo.mall.common.core.serializer.MatrixSerializer;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayOutputStream;
@Slf4j
public class KryoSerializer<T> implements MatrixSerializer<T> {
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private static final ThreadLocal<Kryo> kryos = ThreadLocal.withInitial(Kryo::new);
private Class<T> clazz;
public KryoSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) {
if (t == null) {
return EMPTY_BYTE_ARRAY;
}
Kryo kryo = kryos.get();
kryo.setReferences(false);
kryo.register(clazz);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos)) {
kryo.writeClassAndObject(output, t);
output.flush();
return baos.toByteArray();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return EMPTY_BYTE_ARRAY;
}
@Override
public T deserialize(byte[] bytes) {
if (bytes == null || bytes.length <= 0) {
return null;
}
Kryo kryo = kryos.get();
kryo.setReferences(false);
kryo.register(clazz);
try (Input input = new Input(bytes)) {
return (T) kryo.readClassAndObject(input);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}
......@@ -13,7 +13,7 @@
<dependencies>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-starter</artifactId>
<artifactId>rocketmq-starter</artifactId>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
......
package com.secoo.mall.rocketmq;
import com.secoo.mall.common.core.serializer.MatrixSerializer;
import com.secoo.mall.common.serializer.FastJsonSerializer;
import com.secoo.mall.rocketmq.util.RunTimeUtil;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -9,6 +11,8 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.PostConstruct;
......@@ -27,14 +31,19 @@ public class PushRocketMQProducer {
private String topic;
private boolean retryAnotherBrokerWhenNotStoreOK = true;
private int retryTimesWhenSendAsyncFailed = 3;
private DefaultMQProducer producer ;
private int level = 0;
private DefaultMQProducer producer;
private Integer level = 0;
private Logger logger = LoggerFactory.getLogger(PushRocketMQConsumer.class);
private static final int MASSAGES =1024000;
private static final int MASSAGES = 1024000;
/**
* 注入自定义序列化器,如果空则默认JSON
*/
private MatrixSerializer serializer;
@PostConstruct
public void init() {
this.logger.info("初始化RocketMQ消息发送者,namesrvAddr={},producerGroup={},tags={}", new Object[] { this.namesrvAddr, this.producerGroup, this.tags });
this.logger.info("初始化RocketMQ消息发送者,namesrvAddr={},producerGroup={},tags={}", new Object[]{this.namesrvAddr, this.producerGroup, this.tags});
try {
DefaultMQProducer producerNew = null;
producerNew = new DefaultMQProducer(producerGroup);
......@@ -43,39 +52,99 @@ public class PushRocketMQProducer {
producerNew.setMaxMessageSize(MASSAGES);
producerNew.setRetryAnotherBrokerWhenNotStoreOK(retryAnotherBrokerWhenNotStoreOK);
producerNew.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);
if (serializer == null) {
serializer = new FastJsonSerializer(Object.class);
}
producerNew.start();
this.producer = producerNew;
} catch (MQClientException e) {
throw new RuntimeException(e.getMessage(),e);
throw new RuntimeException(e.getMessage(), e);
}
}
public SendResult send(Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setDelayTimeLevel(this.level);
SendResult result = this.producer.send(msg);
if(logger.isDebugEnabled()){
this.logger.debug("发送RocketMQ消息,topic:{},keys:{},结果:{}",new Object[]{msg.getTopic(),msg.getKeys(),result} );
if (logger.isDebugEnabled()) {
this.logger.debug("发送RocketMQ消息,topic:{},keys:{},结果:{}", new Object[]{msg.getTopic(), msg.getKeys(), result});
}
return result;
}
public SendResult send(String newTags, String keys, byte[] body)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(getTopic(), newTags, keys, body);
}
public SendResult send(String topic, String newTags, String keys, byte[] body)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Message msg = new Message(getTopic(), newTags, keys, body);
return send(msg);
}
public SendResult send(String keys, byte[] body)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Message msg = new Message(getTopic(), this.tags, keys, body);
return send(msg);
}
/**
* from secoo-framework
* 发送消息
*
* @param topic 消息主题
* @param tag 支持一个tag
* @param keys 多个keys逗号隔开
*/
public SendResult send(String topic, String tag, String keys, Object body)
throws RuntimeException {
if (body == null || topic == null) {
return null;
}
byte[] bodyArr = null;
if (body instanceof byte[]) {
bodyArr = (byte[]) body;
} else {
bodyArr = serializer.serialize(body);
}
return send(topic, tag, keys, body);
}
/**
* DB事务同步发送
*
* @param afterFlag true 提交事务后发送,false 提交事务前发送
*/
public void sendTransSynch(final String topic, final String tag, final String keys,
final Object body, boolean afterFlag) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager
.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
if (!afterFlag) {
send(topic, tag, keys, body);
}
}
@Override
public void afterCommit() {
if (afterFlag) {
send(topic, tag, keys, body);
}
}
});
} else {
send(topic, tag, keys, body);
}
}
public String getProducerGroup() {
return producerGroup;
......
/**
* Created by Administrator on 2018/1/19.
*/
package com.secoo.mall.rocketmq.serializer;
/**
* 消息序列化器
*
* @author Administrator
* @create 2018-01-19 14:29
**/
public interface MqSerializer {
/**
* 消息序列化接口
*/
<T> byte[] serialize(T obj);
}
......@@ -14,7 +14,6 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
<build>
......
......@@ -18,8 +18,8 @@
</properties>
<modules>
<module>matrix-mq-rocketmq-core</module>
<module>matrix-mq-rocketmq-starter</module>
<module>matrix-mq-rocketmq-client</module>
<module>matrix-mq-rocketmq-starter</module>
</modules>
<dependencyManagement>
......
......@@ -20,6 +20,7 @@
<module>elasticsearch-starter</module>
<module>openfeign-starter</module>
<module>matrix-mq</module>
<module>rocketmq-starter</module>
<module>monitor-starter</module>
<module>config-starter</module>
<module>matrix-protocol</module>
......@@ -112,6 +113,12 @@
<artifactId>matrix-mq-rocketmq-client</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<!--rocketmq-starter废弃,启用matrix-mq-rocketmq-starter-->
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>rocketmq-starter</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-starter</artifactId>
......@@ -206,7 +213,11 @@
<artifactId>joda-time</artifactId>
<version>2.10</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.3.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-starter</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.secoo.mall.rocketmq.config;
import com.secoo.mall.rocketmq.hook.MatrixConsumeHook;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author qianglu
*/
@Configuration
public class MatrixListenerContainerConfiguration implements BeanPostProcessor {
private final static Logger log = LoggerFactory.getLogger(MatrixListenerContainerConfiguration.class);
@Value("${matrix.rocketmq.consumer.delayLevel:0}")
private int delayLevel;
@Bean
MatrixConsumeHook matrixConsumeHook() {
return new MatrixConsumeHook();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
if (log.isDebugEnabled()) {
log.debug("MatrixListenerContainerConfiguration config setDelayLevelWhenNextConsume value:{}", delayLevel);
}
container.setDelayLevelWhenNextConsume(delayLevel);
container.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(matrixConsumeHook());
}
return bean;
}
}
package com.secoo.mall.rocketmq.config;
import com.secoo.mall.rocketmq.hook.MatrixProducerHook;
import com.secoo.mall.rocketmq.producer.MartixProducer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author qianglu
*/
@Configuration
@Import(MatrixListenerContainerConfiguration.class)
public class MatrixRocketMQAutoConfiguration {
@Bean
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MartixProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
MatrixProducerHook matrixProducerHook = new MatrixProducerHook();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new MartixProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
producer.setVipChannelEnabled(false);
} else {
producer = new MartixProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic(),matrixProducerHook);
}
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
}
package com.secoo.mall.rocketmq.hook;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianglu
*/
public class MatrixConsumeHook implements ConsumeMessageHook {
private final static Logger log = LoggerFactory.getLogger(MatrixConsumeHook.class);
@Override
public String hookName() {
return "MatrixConsumeHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
try {
log.info("consumeMessageAfter,Succ:{} Topic:{},TraceContext:{},ConsumerGroup:{}", context.isSuccess(), context.getMq().getTopic(), context.getMqTraceContext(), context.getConsumerGroup());
} catch (Exception e) {//防灾冗余
}
}
}
package com.secoo.mall.rocketmq.hook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianglu
*/
public class MatrixProducerHook implements SendMessageHook {
private final static Logger log = LoggerFactory.getLogger(MatrixProducerHook.class);
@Override
public String hookName() {
return "MatrixProducerHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
}
@Override
public void sendMessageAfter(SendMessageContext context) {
try {
log.info("sendMessageAfter,TraceContext:{},Mode:{},Topic:{},Targs:{},ProducerGroup:{},BrokerAddr:{},Namespace:{},Exception:{}", context.getMqTraceContext(), context.getCommunicationMode().name(), context.getMessage().getTopic(), context.getMessage().getTags(), context.getProducerGroup(), context.getBrokerAddr(), context.getNamespace(), context.getException() == null ? null : context.getException().getMessage());
} catch (Exception e) {//防灾冗余}
}
}
}
\ No newline at end of file
package com.secoo.mall.rocketmq.producer;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author qianglu
*/
public class MartixProducer extends DefaultMQProducer {
public MartixProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic, SendMessageHook messageHook) {
super(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
this.defaultMQProducerImpl.registerSendMessageHook(messageHook);
}
public MartixProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic, SendMessageHook messageHook) {
super(producerGroup, enableMsgTrace, customizedTraceTopic);
this.defaultMQProducerImpl.registerSendMessageHook(messageHook);
}
}
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.secoo.mall.rocketmq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment