Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 79b8ced

Browse filesBrowse files
committed
[A] 基于Netty的短连接、长连接demo
1 parent 7dc4b25 commit 79b8ced
Copy full SHA for 79b8ced

File tree

Expand file treeCollapse file tree

9 files changed

+495
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

9 files changed

+495
-0
lines changed
Open diff view settings
Collapse file
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.bruis.learnnetty.netty.connections.longconnections;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.channel.ChannelInboundHandlerAdapter;
6+
7+
/**
8+
* @author lhy
9+
* @date 2022/2/11
10+
*/
11+
public class ClientHandler extends ChannelInboundHandlerAdapter {
12+
@Override
13+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14+
Response response = JSONObject.parseObject(msg.toString(), Response.class);
15+
RequestFuture.received(response);
16+
}
17+
}
Collapse file
+91Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.bruis.learnnetty.netty.connections.longconnections;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.netty.connections.longconnections.ClientHandler;
5+
import io.netty.bootstrap.Bootstrap;
6+
import io.netty.buffer.PooledByteBufAllocator;
7+
import io.netty.channel.ChannelFuture;
8+
import io.netty.channel.ChannelInitializer;
9+
import io.netty.channel.ChannelOption;
10+
import io.netty.channel.EventLoopGroup;
11+
import io.netty.channel.nio.NioEventLoopGroup;
12+
import io.netty.channel.socket.nio.NioSocketChannel;
13+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
14+
import io.netty.handler.codec.LengthFieldPrepender;
15+
import io.netty.handler.codec.string.StringDecoder;
16+
import io.netty.handler.codec.string.StringEncoder;
17+
import org.springframework.util.StringUtils;
18+
19+
import java.nio.charset.Charset;
20+
21+
/**
22+
* @author lhy
23+
* @date 2022/2/16
24+
*/
25+
public class NettyClient {
26+
public static EventLoopGroup group = null;
27+
public static Bootstrap bootstrap = null;
28+
public static ChannelFuture future = null;
29+
static {
30+
bootstrap = new Bootstrap();
31+
group = new NioEventLoopGroup();
32+
bootstrap.channel(NioSocketChannel.class);
33+
bootstrap.group(group);
34+
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
35+
final ClientHandler clientHandler = new ClientHandler();
36+
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
37+
@Override
38+
protected void initChannel(NioSocketChannel ch) throws Exception {
39+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
40+
0, 4, 0, 4));
41+
ch.pipeline().addLast(new StringDecoder());
42+
ch.pipeline().addLast(clientHandler);
43+
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
44+
ch.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
45+
}
46+
});
47+
try {
48+
future = bootstrap.connect("127.0.0.1", 8080).sync();
49+
} catch (InterruptedException e) {
50+
e.printStackTrace();
51+
}
52+
}
53+
54+
/**
55+
* 说明:对于这个长连接的例子中,使用了静态化,即单链接、长连接,如果是多链接的话不可使用静态化,需使用线程池。
56+
* @param msg
57+
* @return
58+
*/
59+
public Object sendRequest(Object msg) {
60+
try {
61+
RequestFuture request = new RequestFuture();
62+
request.setRequest(msg);
63+
String requestStr = JSONObject.toJSONString(request);
64+
future.channel().writeAndFlush(requestStr);
65+
myselfPrint("我阻塞了", null);
66+
return request.get();
67+
} catch (Exception e) {
68+
e.printStackTrace();
69+
throw e;
70+
}
71+
}
72+
public static void main(String[] args) {
73+
NettyClient nettyClient = new NettyClient();
74+
for (int i = 0; i < 10; i++) {
75+
Object result = nettyClient.sendRequest("hello");
76+
myselfPrint("拿到结果了", result);
77+
}
78+
}
79+
80+
public static void myselfPrint(String description, Object value) {
81+
StringBuilder builder = new StringBuilder();
82+
builder.append(Thread.currentThread().getName());
83+
if (!StringUtils.isEmpty(description)) {
84+
builder.append("-").append(description);
85+
}
86+
if (!StringUtils.isEmpty(value)) {
87+
builder.append("-").append(value);
88+
}
89+
System.out.println(builder.toString());
90+
}
91+
}
Collapse file
+52Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.bruis.learnnetty.netty.connections.longconnections;
2+
3+
import com.bruis.learnnetty.netty.connections.shortconnections.ServerHandler;
4+
import io.netty.bootstrap.ServerBootstrap;
5+
import io.netty.channel.ChannelFuture;
6+
import io.netty.channel.ChannelInitializer;
7+
import io.netty.channel.ChannelOption;
8+
import io.netty.channel.EventLoopGroup;
9+
import io.netty.channel.nio.NioEventLoopGroup;
10+
import io.netty.channel.socket.SocketChannel;
11+
import io.netty.channel.socket.nio.NioServerSocketChannel;
12+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
13+
import io.netty.handler.codec.LengthFieldPrepender;
14+
import io.netty.handler.codec.string.StringDecoder;
15+
import io.netty.handler.codec.string.StringEncoder;
16+
17+
/**
18+
* 基于短连接的Netty服务端
19+
*
20+
* @author lhy
21+
* @date 2022/2/11
22+
*/
23+
public class NettyServer {
24+
public static void main(String[] args) {
25+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
26+
EventLoopGroup workerGroup = new NioEventLoopGroup();
27+
try {
28+
ServerBootstrap serverBootstrap = new ServerBootstrap();
29+
serverBootstrap.group(bossGroup, workerGroup);
30+
serverBootstrap.channel(NioServerSocketChannel.class);
31+
32+
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
33+
.childHandler(new ChannelInitializer<SocketChannel>() {
34+
@Override
35+
protected void initChannel(SocketChannel ch) throws Exception {
36+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
37+
.addLast(new StringDecoder())
38+
.addLast(new ServerHandler())
39+
.addLast(new LengthFieldPrepender(4, false))
40+
.addLast(new StringEncoder());
41+
}
42+
});
43+
ChannelFuture future = serverBootstrap.bind(8080).sync();
44+
future.channel().closeFuture().sync();
45+
} catch (Exception e) {
46+
e.printStackTrace();
47+
} finally {
48+
bossGroup.shutdownGracefully();
49+
workerGroup.shutdownGracefully();
50+
}
51+
}
52+
}
Collapse file
+130Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package com.bruis.learnnetty.netty.connections.longconnections;
2+
3+
import java.util.Map;
4+
import java.util.Objects;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
import java.util.concurrent.locks.Condition;
9+
import java.util.concurrent.locks.Lock;
10+
import java.util.concurrent.locks.ReentrantLock;
11+
12+
/**
13+
* 模拟客户端请求类,用于构建请求对象
14+
*
15+
* @author lhy
16+
* @date 2022/2/10
17+
*/
18+
public class RequestFuture {
19+
public static Map<Long, RequestFuture> futures = new ConcurrentHashMap<>();
20+
private final Lock lock = new ReentrantLock();
21+
private final Condition condition = lock.newCondition();
22+
private long id;
23+
/**
24+
* 请求参数
25+
*/
26+
private Object request;
27+
/**
28+
* 响应结果
29+
*/
30+
private Object result;
31+
/**
32+
* 超时时间
33+
*/
34+
private long timeout = 5000;
35+
public static final AtomicLong aid = new AtomicLong();
36+
37+
public RequestFuture() {
38+
id = aid.incrementAndGet();
39+
addFuture(this);
40+
}
41+
42+
/**
43+
* 把请求放入本地缓存中
44+
* @param future
45+
*/
46+
public static void addFuture(RequestFuture future) {
47+
futures.put(future.getId(), future);
48+
}
49+
50+
/**
51+
* 同步获取响应结果
52+
* @return
53+
*/
54+
public Object get() {
55+
lock.lock();
56+
try {
57+
while (this.result == null) {
58+
try {
59+
// 主线程默认等待5s,然后查看下结果
60+
condition.await(timeout, TimeUnit.MILLISECONDS);
61+
} catch (InterruptedException e) {
62+
e.printStackTrace();
63+
}
64+
}
65+
} finally {
66+
lock.unlock();
67+
}
68+
return this.result;
69+
}
70+
71+
/**
72+
* 表明服务端发送过来的结果已经接收到了,可以signal了
73+
* @param result
74+
*/
75+
public static void received(Response result) {
76+
RequestFuture future = futures.remove(result.getId());
77+
if (null != future) {
78+
future.setResult(result.getResult());
79+
}
80+
/**
81+
* 通知主线程
82+
*/
83+
Objects.requireNonNull(future, "RequestFuture").getLock().lock();
84+
try {
85+
future.getCondition().signalAll();
86+
} finally {
87+
Objects.requireNonNull(future, "RequestFuture").getLock().unlock();
88+
}
89+
}
90+
91+
public long getId() {
92+
return id;
93+
}
94+
95+
public void setId(long id) {
96+
this.id = id;
97+
}
98+
99+
public Object getRequest() {
100+
return request;
101+
}
102+
103+
public void setRequest(Object request) {
104+
this.request = request;
105+
}
106+
107+
public Object getResult() {
108+
return result;
109+
}
110+
111+
public void setResult(Object result) {
112+
this.result = result;
113+
}
114+
115+
public long getTimeout() {
116+
return timeout;
117+
}
118+
119+
public void setTimeout(long timeout) {
120+
this.timeout = timeout;
121+
}
122+
123+
public Lock getLock() {
124+
return lock;
125+
}
126+
127+
public Condition getCondition() {
128+
return condition;
129+
}
130+
}
Collapse file
+28Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.bruis.learnnetty.netty.connections.longconnections;
2+
3+
/**
4+
* 响应结果类
5+
*
6+
* @author lhy
7+
* @date 2022/2/10
8+
*/
9+
public class Response {
10+
private long id;
11+
private Object result;
12+
13+
public long getId() {
14+
return id;
15+
}
16+
17+
public void setId(long id) {
18+
this.id = id;
19+
}
20+
21+
public Object getResult() {
22+
return result;
23+
}
24+
25+
public void setResult(Object result) {
26+
this.result = result;
27+
}
28+
}
Collapse file
+28Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.bruis.learnnetty.netty.connections.shortconnections;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.thread.synchronize.Response;
5+
import io.netty.channel.ChannelHandlerContext;
6+
import io.netty.channel.ChannelInboundHandlerAdapter;
7+
import io.netty.util.concurrent.Promise;
8+
9+
/**
10+
* @author lhy
11+
* @date 2022/2/11
12+
*/
13+
public class ClientHandler extends ChannelInboundHandlerAdapter {
14+
private Promise<Response> promise;
15+
@Override
16+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
17+
Response response = JSONObject.parseObject(msg.toString(), Response.class);
18+
promise.setSuccess(response);
19+
}
20+
21+
public Promise<Response> getPromise() {
22+
return promise;
23+
}
24+
25+
public void setPromise(Promise<Response> promise) {
26+
this.promise = promise;
27+
}
28+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.