Commit 339cb4a0 by 李秋伟

Merge branch 'tech_mq-module-update' into 'master'

matrix-mq module init

See merge request mall/arch/matrix!38
parents 77647f2e f170958b
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-datasource</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-datasource</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -21,7 +21,7 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-datasource-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
......
# 介绍
rocketmq-starter
matrix-rocketmq-core
# 特点
......@@ -19,7 +19,7 @@ rocketmq-starter
```xml
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>rocketmq-starter</artifactId>
<artifactId>matrix-rocketmq-core</artifactId>
</dependency>
```
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix-mq</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-rocketmq-core</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5-secoo1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.secoo.mall.rocketmq;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.secoo.mall.rocketmq.util.CanalMessageUtil;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
public abstract class CanalPushRocketMQConsumer extends PushRocketMQConsumer {
private static Logger logger = LoggerFactory.getLogger(CanalPushRocketMQConsumer.class);
@Override
public ConsumeConcurrentlyStatus doService(List<MessageExt> msgs){
return this.doCanalService(messageConverter(msgs));
}
public abstract ConsumeConcurrentlyStatus doCanalService(List<List<FlatMessage>> flatMessageLists);
public List<List<FlatMessage>> messageConverter(List<MessageExt> messageExts) {
if(CollectionUtils.isEmpty(messageExts)){
return null;
}
List<List<FlatMessage>> flatMessageLists = new ArrayList<List<FlatMessage>>();
for(MessageExt messageExt:messageExts){
flatMessageLists.add(CanalMessageUtil.messageConverter(messageExt));
}
return flatMessageLists;
}
}
package com.secoo.mall.rocketmq;
public class MessageContent {
private String topic;
private String tags;
private String key;
private String content;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
\ No newline at end of file
package com.secoo.mall.rocketmq;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentPushRocketMQProducer extends PushRocketMQProducer{
private static Logger LOGGER = LoggerFactory.getLogger(PersistentPushRocketMQProducer.class);
private String TOPIC = "TOPIC_ROUTER_MESSAGES";
private String TAGS = "ROUTER_MSG_SENT";
public static String TAGS_KEY = "TAGS";
public static String KEYS_KEY = "KEYS";
private Message wrapMessage(Message msg){
MessageContent messageContent = new MessageContent();
messageContent.setTopic(msg.getTopic());
String tags = msg.getProperty(TAGS_KEY);
messageContent.setTags(tags);
String keys = msg.getProperty(KEYS_KEY);
messageContent.setKey(keys);
String content = new String(msg.getBody());
messageContent.setContent(content);
Message containingMsg = new Message(TOPIC, TAGS, keys, JSON.toJSONString(messageContent).getBytes());
return containingMsg;
}
@Override
public SendResult send(Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
Message message = wrapMessage(msg);
msg.setDelayTimeLevel(getLevel());
SendResult result = getProducer().send(message);
if(LOGGER.isDebugEnabled()){
LOGGER.debug("发送RocketMQ消息,topic:{},keys:{},结果:{}",new Object[]{msg.getTopic(),msg.getKeys(),result} );
}
return result;
}
}
package com.secoo.mall.rocketmq;
import com.secoo.mall.rocketmq.util.RunTimeUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* Created by Intellij idea
* User: jingxingxin
* Date: 2015/6/3
* Time: 14:05
* To change this template use File | Settings | File Templates.
*/
public abstract class PushRocketMQConsumer {
private String consumerGroup;
private String topic;
private String tags;
private String namesrvAddr;
private String messageModel;
private String consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.toString();
private String accessKey;
private String secretKey;
/**
* 消费消息线程,最小数目
*/
private int consumeThreadMin = 20;
/**
* 消费消息线程,最大数目
*/
private int consumeThreadMax = 64;
/**
* 本地队列消息数超过此阀值,开始流控
*/
private int pullThresholdForQueue = 1000;
private final Logger logger = LoggerFactory.getLogger(PushRocketMQConsumer.class);
@PostConstruct
public void restartConsumer() throws InterruptedException, MQClientException {
logger.info("初始化RocketMQ消息消费者,namesrvAddr={},topic={},tags={}", new Object[]{this.namesrvAddr, getTopic(), this.tags});
final String consumerGroup = this.consumerGroup;
final String topic = this.topic;
final DefaultMQPushConsumer[] consumer = {null};
new Thread(new Runnable() {
@Override
public void run() {
consumer[0] = new DefaultMQPushConsumer(consumerGroup,getAclRPCHook(),new AllocateMessageQueueAveragely());
consumer[0].setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
consumer[0].setNamesrvAddr(namesrvAddr);
try {
consumer[0].subscribe(topic, tags);
} catch (MQClientException e) {
logger.error("rocketmq创建失败", e);
}
if (messageModel == null) {
consumer[0].setMessageModel(MessageModel.CLUSTERING);
} else {
consumer[0].setMessageModel(MessageModel.valueOf(messageModel));
}
/** 注意新主题从来没有消费过,producer如果先启动,consumer后启动,间隔时间内producer发出的消息默认是接不到的,需要如下设置 */
consumer[0].setConsumeFromWhere(ConsumeFromWhere.valueOf(consumeFromWhere));
consumer[0].registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (msgs.isEmpty()) {
return null;
}
try {
return doService(msgs);
} catch (Throwable e) {
logger.error("消费消息异常" + e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
}
}
});
consumer[0].setConsumeThreadMin(consumeThreadMin);
consumer[0].setConsumeThreadMax(consumeThreadMax);
consumer[0].setPullThresholdForQueue(pullThresholdForQueue);
try {
consumer[0].start();
} catch (MQClientException e) {
logger.error("rocketmq启动失败", e);
}
}
}, "MQConsumer-" + topic).start();
}
public abstract ConsumeConcurrentlyStatus doService(List<MessageExt> msgs);
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getMessageModel() {
return messageModel;
}
public void setMessageModel(String messageModel) {
this.messageModel = messageModel;
}
public int getConsumeThreadMin() {
return consumeThreadMin;
}
public void setConsumeThreadMin(int consumeThreadMin) {
this.consumeThreadMin = consumeThreadMin;
}
public int getConsumeThreadMax() {
return consumeThreadMax;
}
public void setConsumeThreadMax(int consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax;
}
public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}
public String getConsumeFromWhere() {
return consumeFromWhere;
}
public void setConsumeFromWhere(String consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public RPCHook getAclRPCHook() {
if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
return new AclClientRPCHook(new SessionCredentials(this.getAccessKey(),this.getSecretKey()));
}
return null;
}
}
package com.secoo.mall.rocketmq;
import com.secoo.mall.rocketmq.util.RunTimeUtil;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
/**
* @author jingxingxin
* @version V1.0
* @Title: rocketmq-client-extension
* @Package com.secoo.rocketmq
* @Description:
* @date 2016/6/12 11:01
*/
public class PushRocketMQProducer {
private String producerGroup;
private String tags;
private String namesrvAddr;
private String topic;
private boolean retryAnotherBrokerWhenNotStoreOK = true;
private int retryTimesWhenSendAsyncFailed = 3;
private DefaultMQProducer producer ;
private int level = 0;
private Logger logger = LoggerFactory.getLogger(PushRocketMQConsumer.class);
private static final int MASSAGES =1024000;
@PostConstruct
public void init() {
this.logger.info("初始化RocketMQ消息发送者,namesrvAddr={},producerGroup={},tags={}", new Object[] { this.namesrvAddr, this.producerGroup, this.tags });
try {
DefaultMQProducer producerNew = null;
producerNew = new DefaultMQProducer(producerGroup);
producerNew.setNamesrvAddr(this.namesrvAddr);
producerNew.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
producerNew.setMaxMessageSize(MASSAGES);
producerNew.setRetryAnotherBrokerWhenNotStoreOK(retryAnotherBrokerWhenNotStoreOK);
producerNew.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);
producerNew.start();
this.producer = producerNew;
} catch (MQClientException e) {
throw new RuntimeException(e.getMessage(),e);
}
}
public SendResult send(Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
msg.setDelayTimeLevel(this.level);
SendResult result = this.producer.send(msg);
if(logger.isDebugEnabled()){
this.logger.debug("发送RocketMQ消息,topic:{},keys:{},结果:{}",new Object[]{msg.getTopic(),msg.getKeys(),result} );
}
return result;
}
public SendResult send(String newTags, String keys, byte[] body)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
Message msg = new Message(getTopic(), newTags, keys, body);
return send(msg);
}
public SendResult send(String keys, byte[] body)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException
{
Message msg = new Message(getTopic(), this.tags, keys, body);
return send(msg);
}
public String getProducerGroup() {
return producerGroup;
}
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public int getLevel() {
return level;
}
public void setLevel(int level) {
this.level = level;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public DefaultMQProducer getProducer() {
return producer;
}
}
package com.secoo.mall.rocketmq;
import com.secoo.mall.rocketmq.handler.MessageHandler;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by Edianzu on 2017-10-09.
*/
public class TagBasedPushRocketMQConsumer extends PushRocketMQConsumer {
private static Logger logger = LoggerFactory.getLogger(TagBasedPushRocketMQConsumer.class);
private Map<String, MessageHandler> tagHandlerMap = new HashMap<String, MessageHandler>();
public Map<String, MessageHandler> getTagHandlerMap() {
return tagHandlerMap;
}
public void setTagHandlerMap(Map<String, MessageHandler> tagHandlerMap) {
this.tagHandlerMap = tagHandlerMap;
}
@Override
public ConsumeConcurrentlyStatus doService(List<MessageExt> msgs) {
for (MessageExt messageExt : msgs) {
String tags = messageExt.getTags();
MessageHandler handler = tagHandlerMap.get(tags);
if (handler != null) {
try {
handler.handle(messageExt);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
package com.secoo.mall.rocketmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* Created by Edianzu on 2017-09-29.
*/
public class TxFriendlyRocketMqProducer {
private static final Logger LOG = LoggerFactory.getLogger(TxFriendlyRocketMqProducer.class);
private PushRocketMQProducer pushRocketMQProducer;
public TxFriendlyRocketMqProducer(PushRocketMQProducer pushRocketMQProducer) {
this.pushRocketMQProducer = pushRocketMQProducer;
}
private void sendMsg(String tags, String keys, String msg) {
try {
LOG.info("Send RocketMq message:" + msg);
pushRocketMQProducer.send(tags, keys, msg.getBytes("utf-8"));
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
}
}
public void send(String msg) {
send("", msg);
}
public void send(String keys, final String msg) {
String tags = pushRocketMQProducer.getTags();
send(tags, keys, msg);
}
public void send(final String tags, final String keys, final String msg) {
if (msg == null || msg.equals("")) {
throw new RuntimeException("Message body is empty.");
}
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager
.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
sendMsg(tags, keys, msg);
}
});
} else {
sendMsg(tags, keys, msg);
}
}
/**
* 事务提交后发送消息
* @param tags
* @param keys
* @param msg
*/
public void sendAfterCommit(final String tags, final String keys, final String msg) {
if (msg == null || msg.equals("")) {
throw new RuntimeException("Message body is empty.");
}
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager
.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
sendMsg(tags, keys, msg);
}
});
} else {
sendMsg(tags, keys, msg);
}
}
}
package com.secoo.mall.rocketmq.handler;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by Edianzu on 2017-10-09.
*/
public abstract class BaseMessageHandler implements MessageHandler{
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean handle(MessageExt messageExt) throws Exception {
String msgId = messageExt.getMsgId();
byte[] body = messageExt.getBody();
String message = new String(body, "utf-8");
logger.info("Message :" + msgId + " received. Message body:" + message);
boolean result = handleMessage(message, messageExt);
logger.info("Message :" + msgId + " is consumed successfully.");
return result;
}
protected abstract boolean handleMessage(String message, MessageExt messageExt) throws Exception;
}
package com.secoo.mall.rocketmq.handler;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Created by Edianzu on 2017-10-10.
*/
public class DelegateMessageHandler implements MessageHandler{
private static Logger logger = LoggerFactory.getLogger(DelegateMessageHandler.class);
private List<MessageHandler> messageHandlers = new ArrayList<MessageHandler>();
public List<MessageHandler> getMessageHandlers() {
return messageHandlers;
}
public void setMessageHandlers(List<MessageHandler> messageHandlers) {
this.messageHandlers = messageHandlers;
}
@Override
public boolean handle(MessageExt messageExt) throws Exception {
for (MessageHandler messageHandler : messageHandlers) {
try {
boolean val = messageHandler.handle(messageExt);
if (!val) {
return val;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return true;
}
}
package com.secoo.mall.rocketmq.handler;
import org.apache.rocketmq.common.message.MessageExt;
/**
* Created by Edianzu on 2017/5/8.
*/
public interface MessageHandler {
public boolean handle(MessageExt messageExt) throws Exception;
}
package com.secoo.mall.rocketmq.spring;
/**
* Created by zhouchun on 16/5/20.
*/
public interface CallBack {
void call();
}
package com.secoo.mall.rocketmq.spring;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* spring 事务提交后回调执行
* 当前如果没有事务,则立即执行<br/>
* 有事务则在事务提交后回调执行<br/>
* Created by zhouchun on 16/5/20.
*/
public class SpringTransactionCommitCallBack {
/**
* 回调
* @param callBack
*/
public static void invoke(final CallBack callBack){
if(TransactionSynchronizationManager.isSynchronizationActive()){
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
callBack.call();
}
});
}
else{
callBack.call();
}
}
}
package com.secoo.mall.mq.consumer;
package com.secoo.mall.rocketmq.util;
import com.google.protobuf.ByteString;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.*;
......
package com.secoo.mall.rocketmq.util;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class RunTimeUtil {
public static void main(String[] args) throws InterruptedException {
System.out.println(getRunTimeInfo());
System.out.println(getPid());
Thread.sleep(1000000000000000L);
}
/**
* 取线程ID
*/
public static int getPid() {
String info = getRunTimeInfo();
int pid = new Random().nextInt();
int index = info.indexOf("@");
if (index > 0) {
pid = Integer.parseInt(info.substring(0, index));
}
return pid;
}
public static String getRunTimeInfo() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String info = runtime.getName();
return info;
}
private static AtomicInteger index = new AtomicInteger();
public static String getRocketMqUniqeInstanceName() {
return "pid" + getPid() + "_index" + index.incrementAndGet();
}
}
package com.secoo.mall.rocketmq;
import com.secoo.mall.rocketmq.PushRocketMQProducer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Producer extends PushRocketMQProducer {
private static Logger logger = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) throws MQClientException,
InterruptedException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ProducerGroupName需要由应用来保证唯一<br>
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
PushRocketMQProducer producer = new PushRocketMQProducer();
producer.setNamesrvAddr("localhost:9876");
producer.setTopic("TopicTest");
producer.setProducerGroup("test");
producer.init();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
for (int i = 0; i < 100; i++) {
try {
{
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println("第 "+i+" 数据"+sendResult.toString());
}
/*{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
logger.info(sendResult.toString());
}
{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
logger.info(sendResult.toString());
}*/
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
package com.secoo.mall.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class PushConsumer {
private static Logger logger = LoggerFactory.getLogger(Producer.class);
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException,
MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroupName");
consumer.setNamesrvAddr("10.0.253.3:9876;10.0.253.2:9876");
consumer.setInstanceName("Consumber");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
//consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 执行TagC的消费
} else if (msg.getTags() != null
&& msg.getTags().equals("TagD")) {
// 执行TagD的消费
}else {
System.out.println(new String(msg.getBody()));
}
} else if (msg.getTopic().equals("TopicTest2")) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
\ No newline at end of file
# 介绍
matrix-rocketmq-starter
# 特点
-
# 开始
- 添加依赖
- Maven:需要在自己项目的pom.xml增加,以下配置。
**注意:前置条件需要依赖项目中需要设置matrix的parent**
```xml
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-starter</artifactId>
</dependency>
```
# 示例
......@@ -3,23 +3,28 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix</artifactId>
<artifactId>matrix-mq</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-starter</artifactId>
<artifactId>matrix-rocketmq-starter</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5-secoo1.2</version>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-core</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
......
package com.secoo.mall.mq.config;
package com.secoo.mall.rocketmq.config;
import com.secoo.mall.mq.hook.MatrixConsumeHook;
import com.secoo.mall.rocketmq.hook.MatrixConsumeHook;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
package com.secoo.mall.mq.config;
package com.secoo.mall.rocketmq.config;
import com.secoo.mall.mq.hook.MatrixProducerHook;
import com.secoo.mall.mq.producer.MartixProducer;
import com.secoo.mall.rocketmq.hook.MatrixProducerHook;
import com.secoo.mall.rocketmq.producer.MartixProducer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
......
package com.secoo.mall.mq.consumer;
package com.secoo.mall.rocketmq.consumer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.secoo.mall.rocketmq.util.CanalMessageUtil;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public abstract class AbsCanalRocketMQConsumer implements RocketMQListener<MessageExt> {
private static Logger logger = LoggerFactory.getLogger(AbsCanalRocketMQConsumer.class);
@Override
public void onMessage(MessageExt messageExt) {
this.onCanalMessage(messageConverter(messageExt));
......
package com.secoo.mall.mq.hook;
package com.secoo.mall.rocketmq.hook;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
......
package com.secoo.mall.mq.hook;
package com.secoo.mall.rocketmq.hook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
......
package com.secoo.mall.mq.producer;
package com.secoo.mall.rocketmq.producer;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
......
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.secoo.mall.mq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file
com.secoo.mall.rocketmq.config.MatrixRocketMQAutoConfiguration
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-mq</artifactId>
<packaging>pom</packaging>
<properties>
<rocketmq.version>4.6.0</rocketmq.version>
<spring.version>5.2.1.RELEASE</spring.version>
<canal.version>1.1.5-secoo1.2</canal.version>
</properties>
<modules>
<module>matrix-rocketmq-core</module>
<module>matrix-rocketmq-starter</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
</dependencies>
</project>
\ No newline at end of file
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-mybatis</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-mybatis</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -25,17 +25,17 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mybatis-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-datasource-druid</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mybatis-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-protocol</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-protocol</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-protocol</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-protocol</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-protocol</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -29,27 +29,27 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-web-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-dubbo-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-web-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-dubbo-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<!-- Aapche Dubbo -->
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -6,7 +6,7 @@
<groupId>com.secoo.mall</groupId>
<artifactId>matrix</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
<packaging>pom</packaging>
......@@ -19,7 +19,7 @@
<module>mongodb-starter</module>
<module>elasticsearch-starter</module>
<module>openfeign-starter</module>
<module>rocketmq-starter</module>
<module>matrix-mq</module>
<module>monitor-starter</module>
<module>config-starter</module>
<module>matrix-protocol</module>
......@@ -46,87 +46,92 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>logger-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>monitor-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>common-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>config-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>common-util</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>redis-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-datasource-druid</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mybatis-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>mongodb-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>elasticsearch-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>rocketmq-starter</artifactId>
<version>1.2.4.RELEASE</version>
<artifactId>matrix-rocketmq-core</artifactId>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-rocketmq-starter</artifactId>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>openfeign-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-dubbo-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-dubbo-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-web-core</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-protocol-web-starter</artifactId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</dependency>
<dependency>
......@@ -182,12 +187,6 @@
<optional>true</optional>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<!--spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.4.RELEASE</version>
<version>1.2.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment