Commit 19dfb613 by 李秋伟

Merge branch 'tech_canal-mq' into 'master'

Tech canal mq

See merge request mall/arch/matrix!36
parents efb7001c dc29dfad
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-datasource</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-datasource</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -21,7 +21,7 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-datasource-core</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-mybatis</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix-mybatis</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -25,17 +25,17 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mybatis-core</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-datasource-druid</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mybatis-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -6,7 +6,7 @@
<groupId>com.secoo.mall</groupId>
<artifactId>matrix</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
<packaging>pom</packaging>
......@@ -48,62 +48,62 @@
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>logger-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>monitor-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>common-core</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>config-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>common-util</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>redis-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-mybatis-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>mongodb-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>elasticsearch-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>protocol-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>rocketmq-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>openfeign-starter</artifactId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</dependency>
<dependency>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -5,7 +5,7 @@
<parent>
<artifactId>matrix</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.2.2.RELEASE</version>
<version>1.2.3.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -15,6 +15,30 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5-secoo1.2</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.secoo.mall.mq.consumer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import java.util.List;
public abstract class AbsCanalRocketMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
this.onCanalMessage(messageConverter(messageExt));
}
public abstract void onCanalMessage(List<FlatMessage> flatMessages);
public List<FlatMessage> messageConverter(MessageExt messageExt) {
if(messageExt == null){
return null;
}
return CanalMessageUtil.messageConverter(messageExt);
}
}
package com.secoo.mall.mq.consumer;
import com.google.protobuf.ByteString;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.*;
public class CanalMessageUtil {
public static List<FlatMessage> messageConverter(MessageExt messageExt) {
Message message = CanalMessageDeserializer.deserializer(messageExt.getBody());
try {
if (message == null) {
return null;
}
List<FlatMessage> flatMessages = new ArrayList<>();
List<CanalEntry.Entry> entrys = null;
if (message.isRaw()) {
List<ByteString> rawEntries = message.getRawEntries();
entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
for (ByteString byteString : rawEntries) {
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
entrys.add(entry);
}
} else {
entrys = message.getEntries();
}
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
FlatMessage flatMessage = new FlatMessage(message.getId());
flatMessages.add(flatMessage);
flatMessage.setDatabase(entry.getHeader().getSchemaName());
flatMessage.setTable(entry.getHeader().getTableName());
flatMessage.setIsDdl(rowChange.getIsDdl());
flatMessage.setType(eventType.toString());
flatMessage.setEs(entry.getHeader().getExecuteTime());
flatMessage.setTs(System.currentTimeMillis());
flatMessage.setSql(rowChange.getSql());
if (!rowChange.getIsDdl()) {
Map<String, Integer> sqlType = new LinkedHashMap<>();
Map<String, String> mysqlType = new LinkedHashMap<>();
List<Map<String, String>> data = new ArrayList<>();
List<Map<String, String>> old = new ArrayList<>();
Set<String> updateSet = new HashSet<>();
boolean hasInitPkNames = false;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
Map<String, String> row = new LinkedHashMap<>();
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
for (CanalEntry.Column column : columns) {
if (!hasInitPkNames && column.getIsKey()) {
flatMessage.addPkName(column.getName());
}
sqlType.put(column.getName(), column.getSqlType());
mysqlType.put(column.getName(), column.getMysqlType());
if (column.getIsNull()) {
row.put(column.getName(), null);
} else {
row.put(column.getName(), column.getValue());
}
// 获取update为true的字段
if (column.getUpdated()) {
updateSet.add(column.getName());
}
}
hasInitPkNames = true;
if (!row.isEmpty()) {
data.add(row);
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, String> rowOld = new LinkedHashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (updateSet.contains(column.getName())) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
rowOld.put(column.getName(), column.getValue());
}
}
}
// update操作将记录修改前的值
if (!rowOld.isEmpty()) {
old.add(rowOld);
}
}
}
if (!sqlType.isEmpty()) {
flatMessage.setSqlType(sqlType);
}
if (!mysqlType.isEmpty()) {
flatMessage.setMysqlType(mysqlType);
}
if (!data.isEmpty()) {
flatMessage.setData(data);
}
if (!old.isEmpty()) {
flatMessage.setOld(old);
}
}
}
return flatMessages;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment