Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit aa8cf9b

Browse filesBrowse files
committed
添加访Rocket心跳机制
1 parent b1748fb commit aa8cf9b
Copy full SHA for aa8cf9b

File tree

Expand file treeCollapse file tree

8 files changed

+405
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

8 files changed

+405
-0
lines changed
Open diff view settings
Collapse file

‎Spring-Netty/pom.xml‎

Copy file name to clipboardExpand all lines: Spring-Netty/pom.xml
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@
3838
<version>${netty-all.version}</version>
3939
</dependency>
4040

41+
<dependency>
42+
<groupId>com.alibaba</groupId>
43+
<artifactId>fastjson</artifactId>
44+
<version>1.2.76</version>
45+
</dependency>
46+
4147
</dependencies>
4248

4349
<build>
Collapse file
+100Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.*;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.netty.channel.socket.SocketChannel;
7+
import io.netty.channel.socket.nio.NioSocketChannel;
8+
import io.netty.util.concurrent.DefaultEventExecutorGroup;
9+
10+
import java.net.InetSocketAddress;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.ScheduledExecutorService;
13+
import java.util.concurrent.ThreadFactory;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
17+
/**
18+
* @author lhy
19+
* @date 2021/8/19
20+
*/
21+
public class ClientTest {
22+
23+
public static final EventLoopGroup myEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactory() {
24+
25+
private AtomicInteger threadIndex = new AtomicInteger(0);
26+
27+
@Override
28+
public Thread newThread(Runnable r) {
29+
return new Thread(r, String.format("MyNettyClientSelector_%d", this.threadIndex.incrementAndGet()));
30+
}
31+
});
32+
33+
public static final DefaultEventExecutorGroup nettyHandlerExecutorGroup = new DefaultEventExecutorGroup(1,
34+
new ThreadFactory() {
35+
private AtomicInteger threadIndex = new AtomicInteger(0);
36+
@Override
37+
public Thread newThread(Runnable r) {
38+
return new Thread(r, "nettyHandlerThread_" + this.threadIndex.incrementAndGet());
39+
}
40+
});
41+
42+
public static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
43+
@Override
44+
public Thread newThread(Runnable r) {
45+
Thread thread = new Thread(r, "scheduledThread_");
46+
thread.setDaemon(false);
47+
return thread;
48+
}
49+
});
50+
51+
public static void main(String[] args) {
52+
53+
Bootstrap bootstrap = new Bootstrap()
54+
.group(myEventLoopGroup)
55+
.channel(NioSocketChannel.class)
56+
.option(ChannelOption.TCP_NODELAY, true)
57+
.option(ChannelOption.SO_KEEPALIVE, false)
58+
.option(ChannelOption.SO_SNDBUF, 65535)
59+
.option(ChannelOption.SO_RCVBUF, 65535)
60+
.handler(new ChannelInitializer<SocketChannel>() {
61+
@Override
62+
protected void initChannel(SocketChannel ch) throws Exception {
63+
ChannelPipeline pipeline = ch.pipeline();
64+
pipeline.addLast(nettyHandlerExecutorGroup,
65+
new NettyEncoder(),
66+
new NettyDecoder(),
67+
new ConnectResponseHandler());
68+
}
69+
});
70+
71+
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 9090);
72+
73+
final ChannelFuture channelFuture = bootstrap.connect(inetSocketAddress);
74+
75+
if (channelFuture.awaitUninterruptibly(2, TimeUnit.MINUTES)) {
76+
// heartBeat(channelFuture.channel());
77+
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
78+
@Override
79+
public void run() {
80+
heartBeat(channelFuture.channel());
81+
}
82+
}, 1000, 30 * 1000, TimeUnit.MILLISECONDS);
83+
}
84+
}
85+
86+
public static void heartBeat(Channel channel) {
87+
String request = "客户端发起了心跳请求";
88+
RemotingCommand command= new RemotingCommand();
89+
command.setBody(request.getBytes());
90+
command.setCode(1);
91+
channel.writeAndFlush(command);
92+
}
93+
94+
public static class ConnectResponseHandler extends SimpleChannelInboundHandler<RemotingCommand> {
95+
@Override
96+
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
97+
System.out.println("服务端返回消息了:" + new String(msg.getBody()));
98+
}
99+
}
100+
}
Collapse file
+40Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
6+
7+
import java.nio.ByteBuffer;
8+
9+
/**
10+
* @author lhy
11+
* @date 2021/8/19
12+
*/
13+
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
14+
15+
public NettyDecoder() {
16+
super(16777216, 0, 4, 0, 4);
17+
}
18+
19+
@Override
20+
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
21+
ByteBuf frame = null;
22+
23+
try {
24+
frame = (ByteBuf) super.decode(ctx, in);
25+
if (null == frame) {
26+
return null;
27+
}
28+
ByteBuffer byteBuffer = frame.nioBuffer();
29+
return RemotingCommand.decode(byteBuffer);
30+
} catch (Exception e) {
31+
e.printStackTrace();
32+
} finally {
33+
if (null != frame) {
34+
frame.release();
35+
}
36+
}
37+
38+
return null;
39+
}
40+
}
Collapse file
+39Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandler;
7+
import io.netty.channel.ChannelHandlerContext;
8+
import io.netty.handler.codec.MessageToByteEncoder;
9+
10+
import java.nio.ByteBuffer;
11+
12+
/**
13+
* @author lhy
14+
* @date 2021/8/19
15+
*/
16+
@ChannelHandler.Sharable
17+
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
18+
19+
@Override
20+
protected void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception {
21+
try {
22+
ByteBuffer header = remotingCommand.encodeHeader();
23+
out.writeBytes(header);
24+
byte[] body = remotingCommand.getBody();
25+
if (null != body) {
26+
out.writeBytes(body);
27+
}
28+
// out.writeBytes(remotingCommand.getBody());
29+
} catch (Exception e) {
30+
e.printStackTrace();
31+
ctx.channel().close().addListener(new ChannelFutureListener() {
32+
@Override
33+
public void operationComplete(ChannelFuture future) throws Exception {
34+
// 关闭channel成功
35+
}
36+
});
37+
}
38+
}
39+
}
Collapse file
+87Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import java.nio.ByteBuffer;
4+
5+
/**
6+
* @author lhy
7+
* @date 2021/8/19
8+
*/
9+
public class RemotingCommand {
10+
11+
private Integer code; // 请求码
12+
13+
private byte[] body; // 请求内容
14+
15+
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
16+
int limit = byteBuffer.limit();
17+
18+
int oriHeaderLen = byteBuffer.getInt();
19+
int headerLength = getHeaderLength(oriHeaderLen);
20+
21+
byte[] headerData = new byte[headerLength];
22+
byteBuffer.get(headerData);
23+
24+
int bodyLength = limit - 4 - headerLength;
25+
26+
byte[] body = new byte[bodyLength];
27+
byteBuffer.get(body);
28+
RemotingCommand remotingCommand = new RemotingCommand();
29+
remotingCommand.setBody(body);
30+
return remotingCommand;
31+
}
32+
33+
public ByteBuffer encodeHeader() {
34+
return encodeHeader(this.body.length);
35+
}
36+
37+
public ByteBuffer encodeHeader(final int bodyLength) {
38+
int length = 4;
39+
40+
byte[] headerData;
41+
headerData = this.headerEncode();
42+
length += headerData.length; // 头
43+
length += bodyLength; // 请求/响应体
44+
45+
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 分配header
46+
result.putInt(length);
47+
result.put(markProtocolType(headerData.length, SerializeType.JSON));
48+
result.put(headerData); // 添加头
49+
result.flip();
50+
51+
return result;
52+
}
53+
54+
public static byte[] markProtocolType(int source, SerializeType type) {
55+
byte[] result = new byte[4];
56+
57+
result[0] = type.getCode();
58+
result[1] = (byte) ((source >> 16) & 0xFF);
59+
result[2] = (byte) ((source >> 8) & 0xFF);
60+
result[3] = (byte) (source & 0xFF);
61+
return result;
62+
}
63+
64+
private byte[] headerEncode() {
65+
return RemotingSerializable.encode(this);
66+
}
67+
68+
public static int getHeaderLength(int length) {
69+
return length & 0xFFFFFF;
70+
}
71+
72+
public Integer getCode() {
73+
return code;
74+
}
75+
76+
public void setCode(Integer code) {
77+
this.code = code;
78+
}
79+
80+
public byte[] getBody() {
81+
return body;
82+
}
83+
84+
public void setBody(byte[] body) {
85+
this.body = body;
86+
}
87+
}
Collapse file
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
import com.alibaba.fastjson.JSON;
4+
5+
import java.nio.charset.Charset;
6+
7+
/**
8+
* @author lhy
9+
* @date 2021/8/19
10+
*/
11+
public abstract class RemotingSerializable {
12+
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
13+
14+
public static byte[] encode(final Object obj) {
15+
final String json = toJson(obj, false);
16+
if (json != null) {
17+
return json.getBytes(CHARSET_UTF8);
18+
}
19+
return null;
20+
}
21+
22+
public static String toJson(final Object obj, boolean prettyFormat) {
23+
return JSON.toJSONString(obj, prettyFormat);
24+
}
25+
26+
public static <T> T decode(final byte[] data, Class<T> classOfT) {
27+
final String json = new String(data, CHARSET_UTF8);
28+
return fromJson(json, classOfT);
29+
}
30+
31+
public static <T> T fromJson(String json, Class<T> classOfT) {
32+
return JSON.parseObject(json, classOfT);
33+
}
34+
35+
public byte[] encode() {
36+
final String json = this.toJson();
37+
if (json != null) {
38+
return json.getBytes(CHARSET_UTF8);
39+
}
40+
return null;
41+
}
42+
43+
public String toJson() {
44+
return toJson(false);
45+
}
46+
47+
public String toJson(final boolean prettyFormat) {
48+
return toJson(this, prettyFormat);
49+
}
50+
}
Collapse file
+28Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.bruis.learnnetty.netty.heartbeat;
2+
3+
/**
4+
* @author lhy
5+
* @date 2021/8/20
6+
*/
7+
public enum SerializeType {
8+
JSON((byte) 0);
9+
10+
private byte code;
11+
12+
SerializeType(byte code) {
13+
this.code = code;
14+
}
15+
16+
public static SerializeType valueOf(byte code) {
17+
for (SerializeType serializeType : SerializeType.values()) {
18+
if (serializeType.getCode() == code) {
19+
return serializeType;
20+
}
21+
}
22+
return null;
23+
}
24+
25+
public byte getCode() {
26+
return code;
27+
}
28+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.