Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 437fc31

Browse filesBrowse files
committed
[A] 基于Netty实现的单机版RPC
1 parent 79b8ced commit 437fc31
Copy full SHA for 437fc31

File tree

Expand file treeCollapse file tree

14 files changed

+593
-2
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

14 files changed

+593
-2
lines changed
Open diff view settings
Collapse file

‎…uis/learnnetty/nio/demo01/NIOClient.java‎ ‎…/com/bruis/learnnetty/nio/NIOClient.java‎Spring-Netty/src/main/java/com/bruis/learnnetty/nio/demo01/NIOClient.java renamed to Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOClient.java Spring-Netty/src/main/java/com/bruis/learnnetty/nio/demo01/NIOClient.java renamed to Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOClient.java

Copy file name to clipboardExpand all lines: Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOClient.java
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bruis.learnnetty.nio.demo01;
1+
package com.bruis.learnnetty.nio;
22

33
import java.net.InetSocketAddress;
44
import java.nio.ByteBuffer;
Collapse file

‎…uis/learnnetty/nio/demo01/NIOServer.java‎ ‎…/com/bruis/learnnetty/nio/NIOServer.java‎Spring-Netty/src/main/java/com/bruis/learnnetty/nio/demo01/NIOServer.java renamed to Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOServer.java Spring-Netty/src/main/java/com/bruis/learnnetty/nio/demo01/NIOServer.java renamed to Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOServer.java

Copy file name to clipboardExpand all lines: Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOServer.java
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bruis.learnnetty.nio.demo01;
1+
package com.bruis.learnnetty.nio;
22

33
import java.net.InetSocketAddress;
44
import java.nio.ByteBuffer;
Collapse file
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.bruis.learnnetty.rpc.client;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.rpc.utils.RequestFuture;
5+
import com.bruis.learnnetty.rpc.utils.Response;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
9+
/**
10+
* @author lhy
11+
* @date 2022/2/11
12+
*/
13+
public class ClientHandler extends ChannelInboundHandlerAdapter {
14+
@Override
15+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
16+
Response response = JSONObject.parseObject(msg.toString(), Response.class);
17+
RequestFuture.received(response);
18+
}
19+
}
Collapse file
+92Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.bruis.learnnetty.rpc.client;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.rpc.utils.RequestFuture;
5+
import io.netty.bootstrap.Bootstrap;
6+
import io.netty.buffer.PooledByteBufAllocator;
7+
import io.netty.channel.ChannelFuture;
8+
import io.netty.channel.ChannelInitializer;
9+
import io.netty.channel.ChannelOption;
10+
import io.netty.channel.EventLoopGroup;
11+
import io.netty.channel.nio.NioEventLoopGroup;
12+
import io.netty.channel.socket.nio.NioSocketChannel;
13+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
14+
import io.netty.handler.codec.LengthFieldPrepender;
15+
import io.netty.handler.codec.string.StringDecoder;
16+
import io.netty.handler.codec.string.StringEncoder;
17+
import org.springframework.util.StringUtils;
18+
19+
import java.nio.charset.Charset;
20+
21+
/**
22+
* @author lhy
23+
* @date 2022/2/16
24+
*/
25+
public class NettyClient {
26+
public static EventLoopGroup group = null;
27+
public static Bootstrap bootstrap = null;
28+
public static ChannelFuture future = null;
29+
static {
30+
bootstrap = new Bootstrap();
31+
group = new NioEventLoopGroup();
32+
bootstrap.channel(NioSocketChannel.class);
33+
bootstrap.group(group);
34+
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
35+
final ClientHandler clientHandler = new ClientHandler();
36+
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
37+
@Override
38+
protected void initChannel(NioSocketChannel ch) throws Exception {
39+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
40+
0, 4, 0, 4));
41+
ch.pipeline().addLast(new StringDecoder());
42+
ch.pipeline().addLast(clientHandler);
43+
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
44+
ch.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
45+
}
46+
});
47+
try {
48+
future = bootstrap.connect("127.0.0.1", 8080).sync();
49+
} catch (InterruptedException e) {
50+
e.printStackTrace();
51+
}
52+
}
53+
54+
/**
55+
* 说明:对于这个长连接的例子中,使用了静态化,即单链接、长连接,如果是多链接的话不可使用静态化,需使用线程池。
56+
* @param msg
57+
* @return
58+
*/
59+
public Object sendRequest(Object msg, String path) {
60+
try {
61+
RequestFuture request = new RequestFuture();
62+
request.setRequest(msg);
63+
request.setPath(path);
64+
String requestStr = JSONObject.toJSONString(request);
65+
future.channel().writeAndFlush(requestStr);
66+
myselfPrint("我阻塞了", null);
67+
return request.get();
68+
} catch (Exception e) {
69+
e.printStackTrace();
70+
throw e;
71+
}
72+
}
73+
public static void main(String[] args) {
74+
NettyClient nettyClient = new NettyClient();
75+
for (int i = 0; i < 10; i++) {
76+
Object result = nettyClient.sendRequest("hello-" + i, "getUserNameById");
77+
myselfPrint("拿到结果了", result);
78+
}
79+
}
80+
81+
public static void myselfPrint(String description, Object value) {
82+
StringBuilder builder = new StringBuilder();
83+
builder.append(Thread.currentThread().getName());
84+
if (!StringUtils.isEmpty(description)) {
85+
builder.append("-").append(description);
86+
}
87+
if (!StringUtils.isEmpty(value)) {
88+
builder.append("-").append(value);
89+
}
90+
System.out.println(builder.toString());
91+
}
92+
}
Collapse file
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.bruis.learnnetty.rpc.controller;
2+
3+
import com.bruis.learnnetty.rpc.utils.Remote;
4+
import org.springframework.stereotype.Controller;
5+
6+
/**
7+
* @author lhy
8+
* @date 2022/2/17
9+
*/
10+
@Controller
11+
public class UserController {
12+
@Remote(value = "getUserNameById")
13+
public Object getUserNameById(String userId) {
14+
System.out.println(Thread.currentThread().getName() + "-> 接受到请求:" + userId);
15+
return "做了业务处理了,结果是用户编号userId为" + userId + "的用户姓名为张三";
16+
}
17+
}
Collapse file
+38Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
4+
5+
/**
6+
* @author lhy
7+
* @date 2022/2/17
8+
*/
9+
public class ApplicationMain {
10+
11+
private static volatile boolean running = true;
12+
13+
public static void main(String[] args) {
14+
try {
15+
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.bruis.learnnetty.rpc");
16+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
17+
try {
18+
context.stop();
19+
} catch (Exception e) {}
20+
21+
synchronized (ApplicationMain.class) {
22+
running = false;
23+
ApplicationMain.class.notify();
24+
}
25+
}));
26+
context.start();
27+
} catch (Exception e) {
28+
e.printStackTrace();
29+
System.exit(1);
30+
}
31+
System.out.println("服务器已启动");
32+
synchronized (ApplicationMain.class) {
33+
try {
34+
ApplicationMain.class.wait();
35+
} catch (Exception e) {}
36+
}
37+
}
38+
}
Collapse file
+55Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import com.bruis.learnnetty.rpc.utils.Mediator;
4+
import com.bruis.learnnetty.rpc.utils.Remote;
5+
import org.springframework.context.ApplicationListener;
6+
import org.springframework.context.event.ContextRefreshedEvent;
7+
import org.springframework.core.Ordered;
8+
import org.springframework.stereotype.Component;
9+
import org.springframework.stereotype.Controller;
10+
11+
import java.lang.reflect.Method;
12+
import java.util.Map;
13+
14+
/**
15+
* @author lhy
16+
* @date 2022/2/17
17+
*/
18+
@Component
19+
public class InitLoadRemoteMethod implements ApplicationListener<ContextRefreshedEvent>, Ordered {
20+
21+
@Override
22+
public void onApplicationEvent(ContextRefreshedEvent context) {
23+
// 获取Spring容器中带有@Controller的注解类
24+
Map<String, Object> controllerBeans = context.getApplicationContext()
25+
.getBeansWithAnnotation(Controller.class);
26+
for (String beanName : controllerBeans.keySet()) {
27+
Object beanObj = controllerBeans.get(beanName);
28+
// 获取这个bean的方法集合
29+
Method[] methods = beanObj.getClass().getMethods();
30+
for (Method method : methods) {
31+
// 判断这个方法是否带有@Remote注解
32+
if (method.isAnnotationPresent(Remote.class)) {
33+
Remote remote = method.getAnnotation(Remote.class);
34+
// 注解的值
35+
String remoteValue = remote.value();
36+
// 缓存这个类
37+
Mediator.MethodBean methodBean = new Mediator.MethodBean();
38+
methodBean.setBean(beanObj);
39+
methodBean.setMethod(method);
40+
// @Remote的value值作为key,MethodBean作为value
41+
Mediator.methodBeans.put(remoteValue, methodBean);
42+
}
43+
}
44+
}
45+
}
46+
47+
/**
48+
* 值越小优先级越高
49+
* @return
50+
*/
51+
@Override
52+
public int getOrder() {
53+
return -1;
54+
}
55+
}
Collapse file
+23Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import org.springframework.context.ApplicationListener;
4+
import org.springframework.context.event.ContextRefreshedEvent;
5+
import org.springframework.stereotype.Component;
6+
7+
/**
8+
* @author lhy
9+
* @date 2022/2/17
10+
*/
11+
@Component
12+
public class NettyApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
13+
@Override
14+
public void onApplicationEvent(ContextRefreshedEvent event) {
15+
// 开启额外线程启动Netty服务
16+
new Thread() {
17+
@Override
18+
public void run() {
19+
NettyServer.start();
20+
}
21+
}.start();
22+
}
23+
}
Collapse file
+51Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import io.netty.bootstrap.ServerBootstrap;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.ChannelOption;
7+
import io.netty.channel.EventLoopGroup;
8+
import io.netty.channel.nio.NioEventLoopGroup;
9+
import io.netty.channel.socket.SocketChannel;
10+
import io.netty.channel.socket.nio.NioServerSocketChannel;
11+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
12+
import io.netty.handler.codec.LengthFieldPrepender;
13+
import io.netty.handler.codec.string.StringDecoder;
14+
import io.netty.handler.codec.string.StringEncoder;
15+
16+
/**
17+
* 基于短连接的Netty服务端
18+
*
19+
* @author lhy
20+
* @date 2022/2/11
21+
*/
22+
public class NettyServer {
23+
public static void start() {
24+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
25+
EventLoopGroup workerGroup = new NioEventLoopGroup();
26+
try {
27+
ServerBootstrap serverBootstrap = new ServerBootstrap();
28+
serverBootstrap.group(bossGroup, workerGroup);
29+
serverBootstrap.channel(NioServerSocketChannel.class);
30+
31+
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
32+
.childHandler(new ChannelInitializer<SocketChannel>() {
33+
@Override
34+
protected void initChannel(SocketChannel ch) throws Exception {
35+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
36+
.addLast(new StringDecoder())
37+
.addLast(new ServerHandler())
38+
.addLast(new LengthFieldPrepender(4, false))
39+
.addLast(new StringEncoder());
40+
}
41+
});
42+
ChannelFuture future = serverBootstrap.bind(8080).sync();
43+
future.channel().closeFuture().sync();
44+
} catch (Exception e) {
45+
e.printStackTrace();
46+
} finally {
47+
bossGroup.shutdownGracefully();
48+
workerGroup.shutdownGracefully();
49+
}
50+
}
51+
}
Collapse file
+29Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.rpc.utils.Mediator;
5+
import com.bruis.learnnetty.rpc.utils.RequestFuture;
6+
import com.bruis.learnnetty.rpc.utils.Response;
7+
import io.netty.channel.ChannelHandler;
8+
import io.netty.channel.ChannelHandlerContext;
9+
import io.netty.channel.ChannelInboundHandlerAdapter;
10+
11+
/**
12+
* @author lhy
13+
* @date 2022/2/11
14+
*/
15+
@ChannelHandler.Sharable
16+
public class ServerHandler extends ChannelInboundHandlerAdapter {
17+
/**
18+
* 接受客户端发送过来的请求
19+
* @param ctx
20+
* @param msg
21+
* @throws Exception
22+
*/
23+
@Override
24+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25+
RequestFuture request = JSONObject.parseObject(msg.toString(), RequestFuture.class);
26+
Response response = Mediator.process(request);
27+
ctx.channel().writeAndFlush(JSONObject.toJSONString(response));
28+
}
29+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.