Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6f17e5c

Browse filesBrowse files
committed
添加顺序消费demo
1 parent 42f2315 commit 6f17e5c
Copy full SHA for 6f17e5c

File tree

Expand file treeCollapse file tree

3 files changed

+219
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+219
-0
lines changed
Open diff view settings
Collapse file

‎rocketmqdemo/README.md‎

Copy file name to clipboard
+7Lines changed: 7 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
## RocketMQDemo 说明
2+
3+
- demo01 普通生产者消费者
4+
5+
- demo02 普通生产者消费者(异步发送消息)
6+
7+
- demo03 顺序消费
Collapse file
+61Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.bruis.rocketmqdemo.demo03;
2+
3+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4+
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
6+
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
7+
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
8+
import org.apache.rocketmq.common.message.MessageExt;
9+
10+
import java.util.List;
11+
import java.util.Random;
12+
import java.util.concurrent.TimeUnit;
13+
14+
/**
15+
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
16+
*
17+
* @author lhy
18+
* @date 2021/7/23
19+
*/
20+
public class Consumer {
21+
22+
public static void main(String[] args) throws Exception {
23+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Producer.GROUP_NAME);
24+
consumer.setNamesrvAddr(Producer.NAMESRV_ADDRESS);
25+
26+
/**
27+
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
28+
* 如果非第一次启动,那么按照上次消费的位置继续消费
29+
*/
30+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
31+
consumer.subscribe(Producer.TOPIC_NAME, "TagA || TagC || TagD");
32+
33+
/**
34+
* 顺序消费的消息类??MessageListenerOrderly
35+
*/
36+
consumer.registerMessageListener(new MessageListenerOrderly() {
37+
38+
Random random = new Random();
39+
40+
@Override
41+
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
42+
context.setAutoCommit(true);
43+
for (MessageExt msg : msgs) {
44+
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
45+
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
46+
}
47+
48+
try {
49+
//模拟业务逻辑处理中...
50+
TimeUnit.SECONDS.sleep(random.nextInt(10));
51+
} catch (Exception e) {
52+
e.printStackTrace();
53+
}
54+
return ConsumeOrderlyStatus.SUCCESS;
55+
}
56+
});
57+
58+
consumer.start();
59+
System.out.println("Consumer Started.");
60+
}
61+
}
Collapse file
+151Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package com.bruis.rocketmqdemo.demo03;
2+
3+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
4+
import org.apache.rocketmq.client.producer.SendResult;
5+
import org.apache.rocketmq.common.message.Message;
6+
7+
import java.text.SimpleDateFormat;
8+
import java.util.ArrayList;
9+
import java.util.Date;
10+
import java.util.List;
11+
12+
/**
13+
*
14+
* 顺序消息样例
15+
*
16+
* @author lhy
17+
* @date 2021/7/23
18+
*/
19+
public class Producer {
20+
21+
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
22+
23+
public static final String GROUP_NAME = "inorder_group_name";
24+
25+
public static final String TOPIC_NAME = "inorder_topic";
26+
27+
public static void main(String[] args) throws Exception {
28+
DefaultMQProducer producer = new DefaultMQProducer(GROUP_NAME);
29+
producer.setNamesrvAddr(NAMESRV_ADDRESS);
30+
31+
producer.start();
32+
33+
String[] tags = {"TagA", "TagC", "TagD"};
34+
35+
// 订单列表
36+
List<OrderStep> orderList = new Producer().buildOrders();
37+
Date date = new Date();
38+
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
39+
String dateStr = sdf.format(date);
40+
for (int i = 0; i < 10; i++) {
41+
// 时间前缀
42+
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
43+
Message msg = new Message(TOPIC_NAME, tags[i % tags.length], "KEY" + i, body.getBytes());
44+
45+
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
46+
Long id = (Long) arg; //根据订单id选择发送queue
47+
long index = id % mqs.size();
48+
return mqs.get((int) index);
49+
}, orderList.get(i).getOrderId());//订单id
50+
51+
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
52+
sendResult.getSendStatus(),
53+
sendResult.getMessageQueue().getQueueId(),
54+
body));
55+
}
56+
57+
producer.shutdown();
58+
}
59+
60+
/**
61+
* 订单的步骤
62+
*/
63+
private class OrderStep {
64+
private long orderId;
65+
private String desc;
66+
67+
public long getOrderId() {
68+
return orderId;
69+
}
70+
71+
public void setOrderId(long orderId) {
72+
this.orderId = orderId;
73+
}
74+
75+
public String getDesc() {
76+
return desc;
77+
}
78+
79+
public void setDesc(String desc) {
80+
this.desc = desc;
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return "OrderStep{" +
86+
"orderId=" + orderId +
87+
", desc='" + desc + '\'' +
88+
'}';
89+
}
90+
}
91+
92+
/**
93+
* 生成模拟订单数据
94+
*/
95+
private List<OrderStep> buildOrders() {
96+
List<OrderStep> orderList = new ArrayList<OrderStep>();
97+
98+
OrderStep orderDemo = new OrderStep();
99+
orderDemo.setOrderId(15103111039L);
100+
orderDemo.setDesc("创建");
101+
orderList.add(orderDemo);
102+
103+
orderDemo = new OrderStep();
104+
orderDemo.setOrderId(15103111065L);
105+
orderDemo.setDesc("创建");
106+
orderList.add(orderDemo);
107+
108+
orderDemo = new OrderStep();
109+
orderDemo.setOrderId(15103111039L);
110+
orderDemo.setDesc("付款");
111+
orderList.add(orderDemo);
112+
113+
orderDemo = new OrderStep();
114+
orderDemo.setOrderId(15103117235L);
115+
orderDemo.setDesc("创建");
116+
orderList.add(orderDemo);
117+
118+
orderDemo = new OrderStep();
119+
orderDemo.setOrderId(15103111065L);
120+
orderDemo.setDesc("付款");
121+
orderList.add(orderDemo);
122+
123+
orderDemo = new OrderStep();
124+
orderDemo.setOrderId(15103117235L);
125+
orderDemo.setDesc("付款");
126+
orderList.add(orderDemo);
127+
128+
orderDemo = new OrderStep();
129+
orderDemo.setOrderId(15103111065L);
130+
orderDemo.setDesc("完成");
131+
orderList.add(orderDemo);
132+
133+
orderDemo = new OrderStep();
134+
orderDemo.setOrderId(15103111039L);
135+
orderDemo.setDesc("推送");
136+
orderList.add(orderDemo);
137+
138+
orderDemo = new OrderStep();
139+
orderDemo.setOrderId(15103117235L);
140+
orderDemo.setDesc("完成");
141+
orderList.add(orderDemo);
142+
143+
orderDemo = new OrderStep();
144+
orderDemo.setOrderId(15103111039L);
145+
orderDemo.setDesc("完成");
146+
orderList.add(orderDemo);
147+
148+
return orderList;
149+
}
150+
151+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.