Commit d3cbf735 by qiuweili123

完善rocketmq

parent 62870eea
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>matrix-sample-rocketmq</artifactId>
<groupId>com.secoo.mall</groupId>
<version>1.3.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>matrix-sample-rocketmq-biz</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>common-util</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.secoo.mall.rocketmq.bean;
import lombok.Data;
import java.io.Serializable;
@Data
public class User implements Serializable {
private Long id;
private String name;
}
package com.secoo.mall.rocketmq.service;
import com.secoo.mall.rocketmq.bean.User;
import org.springframework.stereotype.Service;
@Service
public class UserService {
public User create(User user){
//实际业务操作
return user;
}
}
...@@ -15,6 +15,19 @@ ...@@ -15,6 +15,19 @@
<groupId>com.secoo.mall</groupId> <groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-client</artifactId> <artifactId>matrix-mq-rocketmq-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-sample-rocketmq-biz</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies> </dependencies>
......
package com.secoo.mall.rocketmq;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:spring/*.xml" })
public class Main {
public static void main(String[] args) {
}
}
package com.secoo.mall.rocketmq.config;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
}
package com.secoo.mall.rocketmq.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.secoo.mall.rocketmq.PushRocketMQConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
*1.PushRocketMQConsumer 默认消费模式都是集群消费模式,即一条消息只能被一个消费者消费。
*2.push是broker主动去向consumer推送消息,他们之间只需要保持长连接即可。pull是consumer主动去向broker拉取消息。
* 相对来讲push更便于操作,pull控制比较繁琐,不再提供示例。推荐使用push模式。
*
*/
@Slf4j
public class SimplePushConsumer extends PushRocketMQConsumer {
@Override
public ConsumeConcurrentlyStatus doService(List<MessageExt> msgs) {
for(MessageExt msg : msgs){
try {
JSONObject jsonObject = JSON.parseObject(new String(msg.getBody(), "UTF-8"));
log.info("OrderStatusChangeConsumer jsonObject:{} ", jsonObject);
}catch (Exception e){
log.error("Consumer fail:" + msg + "," + e.getMessage(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
package com.secoo.mall.rocketmq.producer;
import com.secoo.mall.rocketmq.PushRocketMQProducer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class SimplePushProducer extends PushRocketMQProducer {
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<bean id="simplePushConsumer" class="com.secoo.mall.rocketmq.consumer.SimplePushConsumer">
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
<property name="topic" value="simple-push-topic"></property>
<property name="consumerGroup" value="simple-group"/>
<property name="messageModel" value="CLUSTERING"/>
</bean>
</beans>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<bean id="promotionPriceConcumer" class="com.secoo.mall.rocketmq.producer.SimplePushProducer">
<property name="namesrvAddr" value="${rocketmq.namesrvAddr}"/>
<property name="topic" value="simple-push-topic"/>
</bean>
</beans>
\ No newline at end of file
...@@ -15,6 +15,10 @@ ...@@ -15,6 +15,10 @@
<groupId>com.secoo.mall</groupId> <groupId>com.secoo.mall</groupId>
<artifactId>matrix-mq-rocketmq-starter</artifactId> <artifactId>matrix-mq-rocketmq-starter</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-sample-rocketmq-biz</artifactId>
</dependency>
</dependencies> </dependencies>
......
package com.secoo.rocketmq;
import com.secoo.mall.common.util.json.FastJsonUtil;
import com.secoo.mall.rocketmq.bean.User;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import javax.annotation.Resource;
@SpringBootApplication
public class RoketMqSampleApplication implements ApplicationRunner {
@Resource
private RocketMQTemplate template;
@Resource
private String topicName = "roketmq-simple-springboot";
public static void main(String[] args) {
new SpringApplicationBuilder(RoketMqSampleApplication.class).run(args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
User user = new User();
user.setId(1L);
user.setName("zhangsan");
String createTagName = ":create";
//1.没有指定tag,单项发送消息
template.sendOneWay(topicName, user);
/* 注意:org.springframework.messaging.Message为默认对象,此对象是spring对所有消息体的抽象,
* 而MessageExt为RoketMq自身对象,包含更细节信息,如msgId
* */
MessageExt msg = new MessageExt();
//指定具有业务含义的msgId
msg.setMsgId(user.getId().toString());
msg.setBody(FastJsonUtil.toString(user).getBytes());
//2.指定tag,并同步发送消息
//指定tag为create发送到topic
template.syncSend(topicName + createTagName, msg);
}
}
\ No newline at end of file
package com.secoo.rocketmq.consumer;
import com.secoo.mall.rocketmq.bean.User;
import com.secoo.mall.rocketmq.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(topic = "roketmq-simple-springboot", consumerGroup = "message-consumer",selectorExpression="create")
public class MassageConsumer implements RocketMQListener<User> {
@Resource
private UserService service;
@Override
public void onMessage(User user) {
log.info("create user.name:{}",user.getName());
//完成具体业务
service.create(user);
}
}
package com.secoo.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Slf4j
@Component
@RocketMQMessageListener(topic = "roketmq-simple-springboot", selectorExpression = "create", consumerGroup = "message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt message) {
log.info("------- MessageExtConsumer received message, msgId:{}, body:{} ", message.getMsgId(), new String(message.getBody()));
}
@Override
/**
* 可以从指定的位置进行消费
*/
public void prepareStart(DefaultMQPushConsumer consumer) {
// set consumer consume message from now
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
}
}
\ No newline at end of file
...@@ -14,7 +14,17 @@ ...@@ -14,7 +14,17 @@
<modules> <modules>
<module>matrix-sample-rocketmq-spring</module> <module>matrix-sample-rocketmq-spring</module>
<module>matrix-sample-rocketmq-springboot</module> <module>matrix-sample-rocketmq-springboot</module>
<module>matrix-sample-rocketmq-biz</module>
</modules> </modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix-sample-rocketmq-biz</artifactId>
<version>1.3.2.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>
</project> </project>
\ No newline at end of file
...@@ -4,6 +4,14 @@ ...@@ -4,6 +4,14 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging> <packaging>pom</packaging>
<parent>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix</artifactId>
<version>1.3.2.RELEASE</version>
</parent>
<artifactId>matrix-sample</artifactId>
<modules> <modules>
<module>matrix-sample-xxl</module> <module>matrix-sample-xxl</module>
<module>matrix-sample-dubbo</module> <module>matrix-sample-dubbo</module>
...@@ -11,13 +19,7 @@ ...@@ -11,13 +19,7 @@
<module>matrix-sample-rocketmq</module> <module>matrix-sample-rocketmq</module>
<module>matrix-sample-redis</module> <module>matrix-sample-redis</module>
</modules> </modules>
<parent>
<groupId>com.secoo.mall</groupId>
<artifactId>matrix</artifactId>
<version>1.3.2.RELEASE</version>
</parent>
<artifactId>matrix-sample</artifactId>
</project> </project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment