Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
matrix
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
CI / CD
CI / CD
Pipelines
Schedules
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Open sidebar
mall
arch
matrix
Commits
dc29dfad
Commit
dc29dfad
authored
Dec 19, 2019
by
郑冰晶
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
canal 消息消费
parent
d6051df4
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
165 additions
and
0 deletions
+165
-0
AbsCanalRocketMQConsumer.java
rocketmq-starter/src/main/java/com/secoo/mall/mq/consumer/AbsCanalRocketMQConsumer.java
+23
-0
CanalMessageUtil.java
rocketmq-starter/src/main/java/com/secoo/mall/mq/consumer/CanalMessageUtil.java
+142
-0
No files found.
rocketmq-starter/src/main/java/com/secoo/mall/mq/consumer/AbsCanalRocketMQConsumer.java
0 → 100644
View file @
dc29dfad
package
com
.
secoo
.
mall
.
mq
.
consumer
;
import
com.alibaba.otter.canal.protocol.FlatMessage
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.spring.core.RocketMQListener
;
import
java.util.List
;
public
abstract
class
AbsCanalRocketMQConsumer
implements
RocketMQListener
<
MessageExt
>
{
@Override
public
void
onMessage
(
MessageExt
messageExt
)
{
this
.
onCanalMessage
(
messageConverter
(
messageExt
));
}
public
abstract
void
onCanalMessage
(
List
<
FlatMessage
>
flatMessages
);
public
List
<
FlatMessage
>
messageConverter
(
MessageExt
messageExt
)
{
if
(
messageExt
==
null
){
return
null
;
}
return
CanalMessageUtil
.
messageConverter
(
messageExt
);
}
}
rocketmq-starter/src/main/java/com/secoo/mall/mq/consumer/CanalMessageUtil.java
0 → 100644
View file @
dc29dfad
package
com
.
secoo
.
mall
.
mq
.
consumer
;
import
com.google.protobuf.ByteString
;
import
com.alibaba.otter.canal.client.CanalMessageDeserializer
;
import
com.alibaba.otter.canal.protocol.CanalEntry
;
import
com.alibaba.otter.canal.protocol.FlatMessage
;
import
com.alibaba.otter.canal.protocol.Message
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
java.util.*
;
public
class
CanalMessageUtil
{
public
static
List
<
FlatMessage
>
messageConverter
(
MessageExt
messageExt
)
{
Message
message
=
CanalMessageDeserializer
.
deserializer
(
messageExt
.
getBody
());
try
{
if
(
message
==
null
)
{
return
null
;
}
List
<
FlatMessage
>
flatMessages
=
new
ArrayList
<>();
List
<
CanalEntry
.
Entry
>
entrys
=
null
;
if
(
message
.
isRaw
())
{
List
<
ByteString
>
rawEntries
=
message
.
getRawEntries
();
entrys
=
new
ArrayList
<
CanalEntry
.
Entry
>(
rawEntries
.
size
());
for
(
ByteString
byteString
:
rawEntries
)
{
CanalEntry
.
Entry
entry
=
CanalEntry
.
Entry
.
parseFrom
(
byteString
);
entrys
.
add
(
entry
);
}
}
else
{
entrys
=
message
.
getEntries
();
}
for
(
CanalEntry
.
Entry
entry
:
entrys
)
{
if
(
entry
.
getEntryType
()
==
CanalEntry
.
EntryType
.
TRANSACTIONBEGIN
||
entry
.
getEntryType
()
==
CanalEntry
.
EntryType
.
TRANSACTIONEND
)
{
continue
;
}
CanalEntry
.
RowChange
rowChange
;
try
{
rowChange
=
CanalEntry
.
RowChange
.
parseFrom
(
entry
.
getStoreValue
());
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"ERROR ## parser of eromanga-event has an error , data:"
+
entry
.
toString
(),
e
);
}
CanalEntry
.
EventType
eventType
=
rowChange
.
getEventType
();
FlatMessage
flatMessage
=
new
FlatMessage
(
message
.
getId
());
flatMessages
.
add
(
flatMessage
);
flatMessage
.
setDatabase
(
entry
.
getHeader
().
getSchemaName
());
flatMessage
.
setTable
(
entry
.
getHeader
().
getTableName
());
flatMessage
.
setIsDdl
(
rowChange
.
getIsDdl
());
flatMessage
.
setType
(
eventType
.
toString
());
flatMessage
.
setEs
(
entry
.
getHeader
().
getExecuteTime
());
flatMessage
.
setTs
(
System
.
currentTimeMillis
());
flatMessage
.
setSql
(
rowChange
.
getSql
());
if
(!
rowChange
.
getIsDdl
())
{
Map
<
String
,
Integer
>
sqlType
=
new
LinkedHashMap
<>();
Map
<
String
,
String
>
mysqlType
=
new
LinkedHashMap
<>();
List
<
Map
<
String
,
String
>>
data
=
new
ArrayList
<>();
List
<
Map
<
String
,
String
>>
old
=
new
ArrayList
<>();
Set
<
String
>
updateSet
=
new
HashSet
<>();
boolean
hasInitPkNames
=
false
;
for
(
CanalEntry
.
RowData
rowData
:
rowChange
.
getRowDatasList
())
{
if
(
eventType
!=
CanalEntry
.
EventType
.
INSERT
&&
eventType
!=
CanalEntry
.
EventType
.
UPDATE
&&
eventType
!=
CanalEntry
.
EventType
.
DELETE
)
{
continue
;
}
Map
<
String
,
String
>
row
=
new
LinkedHashMap
<>();
List
<
CanalEntry
.
Column
>
columns
;
if
(
eventType
==
CanalEntry
.
EventType
.
DELETE
)
{
columns
=
rowData
.
getBeforeColumnsList
();
}
else
{
columns
=
rowData
.
getAfterColumnsList
();
}
for
(
CanalEntry
.
Column
column
:
columns
)
{
if
(!
hasInitPkNames
&&
column
.
getIsKey
())
{
flatMessage
.
addPkName
(
column
.
getName
());
}
sqlType
.
put
(
column
.
getName
(),
column
.
getSqlType
());
mysqlType
.
put
(
column
.
getName
(),
column
.
getMysqlType
());
if
(
column
.
getIsNull
())
{
row
.
put
(
column
.
getName
(),
null
);
}
else
{
row
.
put
(
column
.
getName
(),
column
.
getValue
());
}
// 获取update为true的字段
if
(
column
.
getUpdated
())
{
updateSet
.
add
(
column
.
getName
());
}
}
hasInitPkNames
=
true
;
if
(!
row
.
isEmpty
())
{
data
.
add
(
row
);
}
if
(
eventType
==
CanalEntry
.
EventType
.
UPDATE
)
{
Map
<
String
,
String
>
rowOld
=
new
LinkedHashMap
<>();
for
(
CanalEntry
.
Column
column
:
rowData
.
getBeforeColumnsList
())
{
if
(
updateSet
.
contains
(
column
.
getName
()))
{
if
(
column
.
getIsNull
())
{
rowOld
.
put
(
column
.
getName
(),
null
);
}
else
{
rowOld
.
put
(
column
.
getName
(),
column
.
getValue
());
}
}
}
// update操作将记录修改前的值
if
(!
rowOld
.
isEmpty
())
{
old
.
add
(
rowOld
);
}
}
}
if
(!
sqlType
.
isEmpty
())
{
flatMessage
.
setSqlType
(
sqlType
);
}
if
(!
mysqlType
.
isEmpty
())
{
flatMessage
.
setMysqlType
(
mysqlType
);
}
if
(!
data
.
isEmpty
())
{
flatMessage
.
setData
(
data
);
}
if
(!
old
.
isEmpty
())
{
flatMessage
.
setOld
(
old
);
}
}
}
return
flatMessages
;
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment