Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c64785d

Browse filesBrowse files
committed
[A] rocketmq 异步发送消息
1 parent f663c8b commit c64785d
Copy full SHA for c64785d

File tree

Expand file treeCollapse file tree

4 files changed

+124
-3
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+124
-3
lines changed
Open diff view settings
Collapse file

‎README.md‎

Copy file name to clipboardExpand all lines: README.md
+2-2Lines changed: 2 additions & 2 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ SpringCloud源码
101101
- [二进制运算以及源码、反码以及补码学习](https://github.com/coderbruis/JavaSourceLearning/blob/master/note/Netty/%E4%BA%8C%E8%BF%9B%E5%88%B6.md)
102102
- [Netty源码包结构](https://github.com/coderbruis/JavaSourceLearning/blob/master/note/Netty/Netty%E6%BA%90%E7%A0%81%E5%8C%85%E7%BB%93%E6%9E%84.md)
103103
- [Netty中的EventLoopGroup](https://github.com/coderbruis/JavaSourceLearning/blob/master/note/Netty/Netty%E4%B8%AD%E7%9A%84EventLoopGroup%E6%98%AF%E4%BB%80%E4%B9%88.md)
104+
- [Netty中的ChannelPipeline]()
105+
- [Netty中的内存分配]()
104106

105107
- SpringSecurity&OAuth2源码学习
106108
- SpringSecurity版本:5.1.0.RELEASE
@@ -122,8 +124,6 @@ SpringCloud源码
122124
- Dubbo底层源码学习(五)—— Dubbo的注册中心重试机制
123125
- [Dubbo底层源码学习(六)—— Dubbo的服务暴露](https://github.com/coderbruis/JavaSourceCodeLearning/blob/master/note/Dubbo/Dubbo%E5%BA%95%E5%B1%82%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E2%80%94%E2%80%94%E6%9C%8D%E5%8A%A1%E6%9A%B4%E9%9C%B2.md)
124126
- Dubbo底层源码学习(七)—— Dubbo的服务消费
125-
126-
- Netty底层源码解析
127127

128128
- RocketMQ底层源码解析
129129

Collapse file

‎rocketmqdemo/src/main/java/com/bruis/rocketmqdemo/demo01/Producer.java‎

Copy file name to clipboardExpand all lines: rocketmqdemo/src/main/java/com/bruis/rocketmqdemo/demo01/Producer.java
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static void main(String[] args) throws Exception {
3434
"key" + i,//keys
3535
("Hello world RocketMQ Demo01" + i).getBytes());
3636

37-
// 向broker发送消息
37+
// 向broker发送消息============================> 同步发送
3838
SendResult sendResult = producer.send(message);
3939
System.out.printf("%s%n", sendResult);
4040
}
Collapse file
+67Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.bruis.rocketmqdemo.demo02;
2+
3+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6+
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7+
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
8+
import org.apache.rocketmq.common.message.MessageExt;
9+
import org.apache.rocketmq.remoting.common.RemotingHelper;
10+
11+
import java.util.List;
12+
13+
/**
14+
* @author lhy
15+
*
16+
* 普通消费者
17+
*
18+
* @date 2021/7/10
19+
*/
20+
public class Consumer {
21+
22+
public static final String DEMO01_CONSUMER_GROUP_NAME = "demo02_consumer_group_name";
23+
24+
public static void main(String[] args) throws Exception {
25+
26+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(DEMO01_CONSUMER_GROUP_NAME);
27+
consumer.setNamesrvAddr(Producer.NAMESRV_ADDRESS);
28+
// 从哪开始进行消费
29+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
30+
consumer.subscribe(Producer.TOPIC_NAME,"*");
31+
consumer.registerMessageListener(new MessageListenerConcurrently() {
32+
@Override
33+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
34+
MessageExt messageExt = msgs.get(0);
35+
try {
36+
String topic = messageExt.getTopic();
37+
String tags = messageExt.getTags();
38+
String keys = messageExt.getKeys();
39+
40+
if ("keyDuplicate".equals(keys)) {
41+
System.err.println("消息消费失败");
42+
int a = 1 / 0;
43+
}
44+
45+
String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
46+
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
47+
} catch (Exception e) {
48+
e.printStackTrace();
49+
// 消费次数
50+
// int reconsumeTimes = messageExt.getReconsumeTimes();
51+
// System.err.println("reconsumeTimes: " + reconsumeTimes);
52+
// // 重试三次
53+
// if (reconsumeTimes == 3) {
54+
// // 日志记录
55+
// // 重试补偿成功
56+
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
57+
// }
58+
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
59+
}
60+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
61+
}
62+
});
63+
64+
consumer.start();
65+
System.err.println("Consumer start ....");
66+
}
67+
}
Collapse file
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.bruis.rocketmqdemo.demo02;
2+
3+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
4+
import org.apache.rocketmq.client.producer.SendCallback;
5+
import org.apache.rocketmq.client.producer.SendResult;
6+
import org.apache.rocketmq.common.message.Message;
7+
8+
/**
9+
* @author lhy
10+
*
11+
* 普通生产者(异步方式发送)
12+
*
13+
* @date 2021/7/10
14+
*/
15+
public class Producer {
16+
17+
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
18+
19+
// 生产者组
20+
public static final String DEMO01_PRODUCER_GROUP_NAME = "demo02_producer_group_name";
21+
22+
// topic
23+
public static final String TOPIC_NAME = "demo02_topic";
24+
25+
public static void main(String[] args) throws Exception {
26+
// 指定生产者组名称
27+
DefaultMQProducer producer = new DefaultMQProducer(DEMO01_PRODUCER_GROUP_NAME);
28+
producer.setNamesrvAddr(NAMESRV_ADDRESS);
29+
30+
producer.start();
31+
32+
for (int i = 0; i < 5; i++) {
33+
Message message = new Message(TOPIC_NAME,// topic
34+
"TagA",//tag
35+
"key" + i,//keys
36+
("Hello world RocketMQ Demo01" + i).getBytes());
37+
38+
// 向broker发送消息
39+
producer.send(message, new SendCallback() {
40+
@Override
41+
public void onSuccess(SendResult sendResult) {
42+
System.out.printf("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus());
43+
}
44+
45+
@Override
46+
public void onException(Throwable e) {
47+
e.printStackTrace();
48+
System.err.println("==============发送失败");
49+
}
50+
});
51+
}
52+
}
53+
54+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.