Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f663c8b

Browse filesBrowse files
committed
add RocketMQ demo
1 parent fa8ab02 commit f663c8b
Copy full SHA for f663c8b

File tree

Expand file treeCollapse file tree

8 files changed

+195
-1
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

8 files changed

+195
-1
lines changed
Open diff view settings
Collapse file

‎.gitignore‎

Copy file name to clipboardExpand all lines: .gitignore
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Spring-Netty/Spring-Netty.iml
1010
Spring-Netty/learnnetty.iml
1111
Spring-Security/SpringSecurityDemo.iml
1212
Spring-Security/springsecurity.iml
13+
rocketmqdemo/rocketmqdemo.iml
1314

1415
# target
1516
JdkLearn/target
@@ -18,6 +19,7 @@ Spring-AOP/target
1819
Spring-Boot/target
1920
Spring-Netty/target
2021
Spring-Security/target
22+
rocketmqdemo/target
2123

2224
# .DO_Store
2325
.DS_Store
Collapse file

‎README.md‎

Copy file name to clipboardExpand all lines: README.md
+7-1Lines changed: 7 additions & 1 deletion
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ Java流行框架源码分析,学习以及总结。项目持续更新中,不
4343

4444
✅ Dubbo源码
4545

46-
Netty源码
46+
✅ Netty源码
47+
48+
✅ RocketMQ源码
4749

4850
MyBatis源码
4951

@@ -120,6 +122,10 @@ SpringCloud源码
120122
- Dubbo底层源码学习(五)—— Dubbo的注册中心重试机制
121123
- [Dubbo底层源码学习(六)—— Dubbo的服务暴露](https://github.com/coderbruis/JavaSourceCodeLearning/blob/master/note/Dubbo/Dubbo%E5%BA%95%E5%B1%82%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E2%80%94%E2%80%94%E6%9C%8D%E5%8A%A1%E6%9A%B4%E9%9C%B2.md)
122124
- Dubbo底层源码学习(七)—— Dubbo的服务消费
125+
126+
- Netty底层源码解析
127+
128+
- RocketMQ底层源码解析
123129

124130
持续更新中...
125131

Collapse file

‎rocketmqdemo/pom.xml‎

Copy file name to clipboard
+47Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>org.springframework.boot</groupId>
7+
<artifactId>spring-boot-starter-parent</artifactId>
8+
<version>2.2.1.RELEASE</version>
9+
<relativePath/> <!-- lookup parent from repository -->
10+
</parent>
11+
<groupId>com.bruis</groupId>
12+
<artifactId>rocketmqdemo</artifactId>
13+
<version>0.0.1-SNAPSHOT</version>
14+
<name>rocketmqdemo</name>
15+
<description>rocketmqdemo</description>
16+
<properties>
17+
<java.version>1.8</java.version>
18+
<rocketmq.version>4.9.0</rocketmq.version>
19+
</properties>
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.springframework.boot</groupId>
23+
<artifactId>spring-boot-starter</artifactId>
24+
</dependency>
25+
26+
<dependency>
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-starter-test</artifactId>
29+
<scope>test</scope>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.apache.rocketmq</groupId>
33+
<artifactId>rocketmq-client</artifactId>
34+
<version>${rocketmq.version}</version>
35+
</dependency>
36+
</dependencies>
37+
38+
<build>
39+
<plugins>
40+
<plugin>
41+
<groupId>org.springframework.boot</groupId>
42+
<artifactId>spring-boot-maven-plugin</artifactId>
43+
</plugin>
44+
</plugins>
45+
</build>
46+
47+
</project>
Collapse file
+13Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.bruis.rocketmqdemo;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class RocketmqdemoApplication {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(RocketmqdemoApplication.class, args);
11+
}
12+
13+
}
Collapse file
+67Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.bruis.rocketmqdemo.demo01;
2+
3+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6+
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7+
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
8+
import org.apache.rocketmq.common.message.MessageExt;
9+
import org.apache.rocketmq.remoting.common.RemotingHelper;
10+
11+
import java.util.List;
12+
13+
/**
14+
* @author lhy
15+
*
16+
* 普通消费者
17+
*
18+
* @date 2021/7/10
19+
*/
20+
public class Consumer {
21+
22+
public static final String DEMO01_CONSUMER_GROUP_NAME = "demo01_consumer_group_name";
23+
24+
public static void main(String[] args) throws Exception {
25+
26+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(DEMO01_CONSUMER_GROUP_NAME);
27+
consumer.setNamesrvAddr(Producer.NAMESRV_ADDRESS);
28+
// 从哪开始进行消费
29+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
30+
consumer.subscribe(Producer.TOPIC_NAME,"*");
31+
consumer.registerMessageListener(new MessageListenerConcurrently() {
32+
@Override
33+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
34+
MessageExt messageExt = msgs.get(0);
35+
try {
36+
String topic = messageExt.getTopic();
37+
String tags = messageExt.getTags();
38+
String keys = messageExt.getKeys();
39+
40+
if ("keyDuplicate".equals(keys)) {
41+
System.err.println("消息消费失败");
42+
int a = 1 / 0;
43+
}
44+
45+
String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
46+
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
47+
} catch (Exception e) {
48+
e.printStackTrace();
49+
// 消费次数
50+
// int reconsumeTimes = messageExt.getReconsumeTimes();
51+
// System.err.println("reconsumeTimes: " + reconsumeTimes);
52+
// // 重试三次
53+
// if (reconsumeTimes == 3) {
54+
// // 日志记录
55+
// // 重试补偿成功
56+
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
57+
// }
58+
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
59+
}
60+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
61+
}
62+
});
63+
64+
consumer.start();
65+
System.err.println("Consumer start ....");
66+
}
67+
}
Collapse file
+45Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.bruis.rocketmqdemo.demo01;
2+
3+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
4+
import org.apache.rocketmq.client.producer.SendResult;
5+
import org.apache.rocketmq.common.message.Message;
6+
7+
/**
8+
* @author lhy
9+
*
10+
* 普通生产者
11+
*
12+
* @date 2021/7/10
13+
*/
14+
public class Producer {
15+
16+
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
17+
18+
// 生产者组
19+
public static final String DEMO01_PRODUCER_GROUP_NAME = "demo01_producer_group_name";
20+
21+
// topic
22+
public static final String TOPIC_NAME = "demo01_topic";
23+
24+
public static void main(String[] args) throws Exception {
25+
// 指定生产者组名称
26+
DefaultMQProducer producer = new DefaultMQProducer(DEMO01_PRODUCER_GROUP_NAME);
27+
producer.setNamesrvAddr(NAMESRV_ADDRESS);
28+
29+
producer.start();
30+
31+
for (int i = 0; i < 5; i++) {
32+
Message message = new Message(TOPIC_NAME,// topic
33+
"TagA",//tag
34+
"key" + i,//keys
35+
("Hello world RocketMQ Demo01" + i).getBytes());
36+
37+
// 向broker发送消息
38+
SendResult sendResult = producer.send(message);
39+
System.out.printf("%s%n", sendResult);
40+
}
41+
42+
producer.shutdown();
43+
}
44+
45+
}
Collapse file
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Collapse file
+13Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.bruis.rocketmqdemo;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.springframework.boot.test.context.SpringBootTest;
5+
6+
@SpringBootTest
7+
class RocketmqdemoApplicationTests {
8+
9+
@Test
10+
void contextLoads() {
11+
}
12+
13+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.