Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 672fb25

Browse filesBrowse files
committed
[A] 添加Netty通讯模拟代码,通过多线程模拟Netty通讯过程;
1 parent dcacc8a commit 672fb25
Copy full SHA for 672fb25

File tree

Expand file treeCollapse file tree

8 files changed

+471
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

8 files changed

+471
-0
lines changed
Open diff view settings
Collapse file
+64Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.learnjava.thread.reentranlock;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
/**
7+
* 模拟Netty通讯过程
8+
* 主线程,获取子线程的结果
9+
*
10+
* @author lhy
11+
* @date 2022/2/10
12+
*/
13+
public class FutureMain {
14+
private static List<RequestFuture> reqs = new ArrayList<>();
15+
public static void main(String[] args) {
16+
mockClient();
17+
mockServer();
18+
}
19+
20+
/**
21+
* 模拟服务端 接受结果
22+
*/
23+
private static void mockServer() {
24+
for (RequestFuture req : reqs) {
25+
/**
26+
* 主线程获取结果
27+
*/
28+
Object result = req.get();
29+
System.out.println("服务端接受到响应结果:" + result.toString());
30+
}
31+
}
32+
/**
33+
* 模拟客户端 发送请求
34+
*/
35+
private static void mockClient() {
36+
for (int i = 0; i < 100; i++) {
37+
long id = i;
38+
RequestFuture req = new RequestFuture();
39+
req.setId(id);
40+
req.setRequest("hello world");
41+
/**
42+
* 把请求缓存起来
43+
*/
44+
RequestFuture.addFuture(req);
45+
/**
46+
* 将请求放入到请求列表中
47+
*/
48+
reqs.add(req);
49+
sendMsg(req);
50+
SubThread subThread = new SubThread(req);
51+
/**
52+
* 开启子线程
53+
*/
54+
subThread.start();
55+
}
56+
}
57+
/**
58+
* 模拟请求处理
59+
* @param req
60+
*/
61+
private static void sendMsg(RequestFuture req) {
62+
System.out.println("客户端发送数据,请求id为===============" + req.getId());
63+
}
64+
}
Collapse file
+123Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.learnjava.thread.reentranlock;
2+
3+
import java.util.Map;
4+
import java.util.Objects;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.locks.Condition;
8+
import java.util.concurrent.locks.Lock;
9+
import java.util.concurrent.locks.ReentrantLock;
10+
11+
/**
12+
* 模拟客户端请求类,用于构建请求对象
13+
*
14+
* @author lhy
15+
* @date 2022/2/10
16+
*/
17+
public class RequestFuture {
18+
public static Map<Long, RequestFuture> futures = new ConcurrentHashMap<>();
19+
private final Lock lock = new ReentrantLock();
20+
private final Condition condition = lock.newCondition();
21+
private long id;
22+
/**
23+
* 请求参数
24+
*/
25+
private Object request;
26+
/**
27+
* 响应结果
28+
*/
29+
private Object result;
30+
/**
31+
* 超时时间
32+
*/
33+
private long timeout = 5000;
34+
35+
/**
36+
* 把请求放入本地缓存中
37+
* @param future
38+
*/
39+
public static void addFuture(RequestFuture future) {
40+
futures.put(future.getId(), future);
41+
}
42+
43+
/**
44+
* 同步获取响应结果
45+
* @return
46+
*/
47+
public Object get() {
48+
lock.lock();
49+
try {
50+
while (this.result == null) {
51+
try {
52+
// 主线程默认等待5s,然后查看下结果
53+
condition.await(timeout, TimeUnit.MILLISECONDS);
54+
} catch (InterruptedException e) {
55+
e.printStackTrace();
56+
}
57+
}
58+
} finally {
59+
lock.unlock();
60+
}
61+
return this.result;
62+
}
63+
64+
/**
65+
* 异步线程将结果返回主线程
66+
* @param result
67+
*/
68+
public static void received(Response result) {
69+
RequestFuture future = futures.remove(result.getId());
70+
if (null != future) {
71+
future.setResult(result.getResult());
72+
}
73+
/**
74+
* 通知主线程
75+
*/
76+
Objects.requireNonNull(future, "RequestFuture").getLock().lock();
77+
try {
78+
future.getCondition().signalAll();
79+
} finally {
80+
Objects.requireNonNull(future, "RequestFuture").getLock().unlock();
81+
}
82+
}
83+
84+
public long getId() {
85+
return id;
86+
}
87+
88+
public void setId(long id) {
89+
this.id = id;
90+
}
91+
92+
public Object getRequest() {
93+
return request;
94+
}
95+
96+
public void setRequest(Object request) {
97+
this.request = request;
98+
}
99+
100+
public Object getResult() {
101+
return result;
102+
}
103+
104+
public void setResult(Object result) {
105+
this.result = result;
106+
}
107+
108+
public long getTimeout() {
109+
return timeout;
110+
}
111+
112+
public void setTimeout(long timeout) {
113+
this.timeout = timeout;
114+
}
115+
116+
public Lock getLock() {
117+
return lock;
118+
}
119+
120+
public Condition getCondition() {
121+
return condition;
122+
}
123+
}
Collapse file
+28Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.learnjava.thread.reentranlock;
2+
3+
/**
4+
* 响应结果类
5+
*
6+
* @author lhy
7+
* @date 2022/2/10
8+
*/
9+
public class Response {
10+
private long id;
11+
private Object result;
12+
13+
public long getId() {
14+
return id;
15+
}
16+
17+
public void setId(long id) {
18+
this.id = id;
19+
}
20+
21+
public Object getResult() {
22+
return result;
23+
}
24+
25+
public void setResult(Object result) {
26+
this.result = result;
27+
}
28+
}
Collapse file
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.learnjava.thread.reentranlock;
2+
3+
/**
4+
* 子线程,用于模拟服务端处理
5+
*
6+
* @author lhy
7+
* @date 2022/2/10
8+
*/
9+
public class SubThread extends Thread {
10+
11+
private RequestFuture request;
12+
13+
public SubThread(RequestFuture request) {
14+
this.request = request;
15+
}
16+
17+
@Override
18+
public void run() {
19+
Response response = new Response();
20+
response.setId(request.getId());
21+
response.setResult("服务端响应了结果,线程id: " + Thread.currentThread().getId() + ", 请求id:" + response.getId());
22+
// 子线程睡眠1s
23+
try {
24+
Thread.sleep(1000);
25+
} catch (InterruptedException e) {
26+
e.printStackTrace();
27+
}
28+
RequestFuture.received(response);
29+
}
30+
}
Collapse file
+64Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.learnjava.thread.synchronize;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
/**
7+
* 模拟Netty通讯过程
8+
* 主线程,获取子线程的结果
9+
*
10+
* @author lhy
11+
* @date 2022/2/10
12+
*/
13+
public class FutureMain {
14+
private static List<RequestFuture> reqs = new ArrayList<>();
15+
public static void main(String[] args) {
16+
mockClient();
17+
mockServer();
18+
}
19+
20+
/**
21+
* 模拟服务端 接受结果
22+
*/
23+
private static void mockServer() {
24+
for (RequestFuture req : reqs) {
25+
/**
26+
* 主线程获取结果
27+
*/
28+
Object result = req.get();
29+
System.out.println("服务端接受到响应结果:" + result.toString());
30+
}
31+
}
32+
/**
33+
* 模拟客户端 发送请求
34+
*/
35+
private static void mockClient() {
36+
for (int i = 0; i < 100; i++) {
37+
long id = i;
38+
RequestFuture req = new RequestFuture();
39+
req.setId(id);
40+
req.setRequest("hello world");
41+
/**
42+
* 把请求缓存起来
43+
*/
44+
RequestFuture.addFuture(req);
45+
/**
46+
* 将请求放入到请求列表中
47+
*/
48+
reqs.add(req);
49+
sendMsg(req);
50+
SubThread subThread = new SubThread(req);
51+
/**
52+
* 开启子线程
53+
*/
54+
subThread.start();
55+
}
56+
}
57+
/**
58+
* 模拟请求处理
59+
* @param req
60+
*/
61+
private static void sendMsg(RequestFuture req) {
62+
System.out.println("客户端发送数据,请求id为===============" + req.getId());
63+
}
64+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.