Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 051fa57

Browse filesBrowse files
committed
add Netty源码分析-ChannelPipeline下
1 parent 2061519 commit 051fa57
Copy full SHA for 051fa57

File tree

Expand file treeCollapse file tree

1 file changed

+322
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

1 file changed

+322
-0
lines changed
Open diff view settings
Collapse file
+322Lines changed: 322 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
## 1. 概览
2+
3+
上篇已经讲解了ChannelPipeline以及ChannelHandler的关系以及对应的类继承关系图,本节来详细分析一下inbound和outbound的原理。
4+
5+
## 2. DefaultChannelPipeline源码分析
6+
7+
在DefaultChannelPipeline中,定义了一个head“头结点”和一个tail“尾结点”,它们都是AbstractChannelhandlerContext类的节点,我们都知道在ChannelPipeline中AbstractChannelHandlerContext就是节点元素的抽象类实现,而这个handlerContext持有ChannelHandler。
8+
9+
在Netty中我们还需要知道inbound和outbound类型的ChannelHandler节点的执行顺序。
10+
11+
下面来先看下一个Netty的demo
12+
13+
该Netty的demo中,分别定义了六个Handler,分为两组,一组是inboundHandler,另一组是outboundHandler。
14+
15+
16+
InBoundHandlerA
17+
```java
18+
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
19+
20+
@Override
21+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
22+
System.out.println("InBoundHandlerA: " + msg);
23+
ctx.fireChannelRead(msg);
24+
}
25+
}
26+
```
27+
28+
InBoundHandlerB
29+
```java
30+
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
31+
@Override
32+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
33+
System.out.println("OutBoundHandlerB: " + msg);
34+
ctx.write(msg, promise);
35+
}
36+
37+
38+
@Override
39+
public void handlerAdded(final ChannelHandlerContext ctx) {
40+
ctx.executor().schedule(() -> {
41+
ctx.channel().write("ctx.channel().write -> hello world");
42+
ctx.write("hello world");
43+
}, 3, TimeUnit.SECONDS);
44+
}
45+
}
46+
```
47+
48+
InBoundHandlerC
49+
```java
50+
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
51+
@Override
52+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
53+
System.out.println("InBoundHandlerC: " + msg);
54+
ctx.fireChannelRead(msg);
55+
}
56+
}
57+
```
58+
59+
```java
60+
public final class Server {
61+
62+
public static void main(String[] args) throws Exception {
63+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
64+
EventLoopGroup workerGroup = new NioEventLoopGroup();
65+
66+
try {
67+
ServerBootstrap b = new ServerBootstrap();
68+
b.group(bossGroup, workerGroup)
69+
.channel(NioServerSocketChannel.class)
70+
.childOption(ChannelOption.TCP_NODELAY, true)
71+
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
72+
.childHandler(new ChannelInitializer<SocketChannel>() {
73+
@Override
74+
public void initChannel(SocketChannel ch) {
75+
ch.pipeline().addLast(new InBoundHandlerA());
76+
ch.pipeline().addLast(new InBoundHandlerB());
77+
ch.pipeline().addLast(new InBoundHandlerC());
78+
}
79+
});
80+
81+
ChannelFuture f = b.bind(8888).sync();
82+
83+
f.channel().closeFuture().sync();
84+
} finally {
85+
bossGroup.shutdownGracefully();
86+
workerGroup.shutdownGracefully();
87+
}
88+
}
89+
}
90+
```
91+
92+
执行结果如下:
93+
```
94+
InBoundHandlerA: hello world
95+
InBoundHandlerB: hello world
96+
InBoundHandlerC: hello world
97+
```
98+
99+
可以发现Netty中,对于inboundHandler来说是按照顺序执行操作的。
100+
101+
接着在看看outboundHandler定义如下
102+
103+
OutBoundHandlerA
104+
```java
105+
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
106+
107+
@Override
108+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
109+
System.out.println("OutBoundHandlerA: " + msg);
110+
ctx.write(msg, promise);
111+
}
112+
}
113+
```
114+
115+
OutBoundHandlerB
116+
```java
117+
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
118+
@Override
119+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
120+
System.out.println("OutBoundHandlerB: " + msg);
121+
ctx.write(msg, promise);
122+
}
123+
}
124+
```
125+
126+
OutBoundHandlerC
127+
```java
128+
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
129+
130+
@Override
131+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
132+
System.out.println("OutBoundHandlerC: " + msg);
133+
ctx.write(msg, promise);
134+
}
135+
}
136+
```
137+
138+
139+
然后修改Server类为如下,
140+
141+
```java
142+
public final class Server {
143+
144+
public static void main(String[] args) throws Exception {
145+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
146+
EventLoopGroup workerGroup = new NioEventLoopGroup();
147+
148+
try {
149+
ServerBootstrap b = new ServerBootstrap();
150+
b.group(bossGroup, workerGroup)
151+
.channel(NioServerSocketChannel.class)
152+
.childOption(ChannelOption.TCP_NODELAY, true)
153+
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
154+
.childHandler(new ChannelInitializer<SocketChannel>() {
155+
@Override
156+
public void initChannel(SocketChannel ch) {
157+
ch.pipeline().addLast(new OutBoundHandlerA());
158+
ch.pipeline().addLast(new OutBoundHandlerB());
159+
ch.pipeline().addLast(new OutBoundHandlerC());
160+
}
161+
});
162+
163+
ChannelFuture f = b.bind(8888).sync();
164+
165+
f.channel().closeFuture().sync();
166+
} finally {
167+
bossGroup.shutdownGracefully();
168+
workerGroup.shutdownGracefully();
169+
}
170+
}
171+
}
172+
```
173+
174+
执行结果如下:
175+
```
176+
OutBoundHandlerC: ctx.channel().write -> hello world
177+
OutBoundHandlerB: ctx.channel().write -> hello world
178+
OutBoundHandlerA: ctx.channel().write -> hello world
179+
OutBoundHandlerA: hello world
180+
```
181+
182+
可以看到在Netty中对于ountboundHandler来说,是倒序执行的。
183+
184+
整个Netty执行ChannelHandler可以用下图来描述。
185+
186+
![channelPipeline事件传播图](https://coderbruis.github.io/javaDocs/img/netty/source/ChannelPipeline事件传播图.png)
187+
188+
189+
上图描述的Head节点顺序执行,Tail节点逆序执行的源码是在DefaultChannelPipeline中,在《Netty-ChannelPipeline-上》文章开头就已经说明了,对于inboundHandler类型的Handler,主要还是用于监听Channel的read、register、active、exceptionCaught等事件,而对于outboundHandler类型来说,主要是用于bind、connect、write、flush等事件,回顾了这一点后,我们在继续看DefaultChannelPipeline源码
190+
191+
```java
192+
public class DefaultChannelPipeline implements ChannelPipeline {
193+
... 省略
194+
195+
@Override
196+
public final ChannelPipeline fireChannelRead(Object msg) {
197+
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
198+
return this;
199+
}
200+
201+
@Override
202+
public final ChannelFuture write(Object msg) {
203+
return tail.write(msg);
204+
}
205+
206+
... 省略
207+
}
208+
```
209+
210+
分别以inbound类型的channelRead和outbound类型的write来分析。
211+
212+
DefaultChannelPipeline.java
213+
```java
214+
@Override
215+
public final ChannelPipeline fireChannelRead(Object msg) {
216+
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
217+
return this;
218+
}
219+
```
220+
在AbstractChannelHandlerContext#invokeChannelRead方法中,传入了一个重要的入参:head,这里就是传入的Head头结点,这一重要调用得以让inbound类型handler在ChannelPipeline中按顺序执行。
221+
222+
AbstractChannelHandlerContext.java
223+
```java
224+
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
225+
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
226+
EventExecutor executor = next.executor();
227+
// 在NioEventLoop线程内,next这里传入的是head头结点
228+
if (executor.inEventLoop()) {
229+
next.invokeChannelRead(m);
230+
} else {
231+
executor.execute(new Runnable() {
232+
@Override
233+
public void run() {
234+
next.invokeChannelRead(m);
235+
}
236+
});
237+
}
238+
}
239+
240+
private void invokeChannelRead(Object msg) {
241+
if (invokeHandler()) {
242+
try {
243+
((ChannelInboundHandler) handler()).channelRead(this, msg);
244+
} catch (Throwable t) {
245+
invokeExceptionCaught(t);
246+
}
247+
} else {
248+
fireChannelRead(msg);
249+
}
250+
}
251+
252+
```
253+
254+
ChannelInboundHandler#channelRead的调用,会最终来到InBoundHandlerA里的channelRead方法。
255+
```java
256+
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
257+
258+
@Override
259+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
260+
System.out.println("InBoundHandlerA: " + msg);
261+
ctx.fireChannelRead(msg);
262+
}
263+
}
264+
```
265+
266+
经过AbstractChannelHandlerContext#fireChannelRead,会在ChannelPipeline中寻找下一个inbound,然后继续执行channelRead。
267+
268+
```java
269+
@Override
270+
public ChannelHandlerContext fireChannelRead(final Object msg) {
271+
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
272+
return this;
273+
}
274+
```
275+
276+
277+
细看OutBoundHandlerB#handlerAdded方法由两个write,一个是ctx.channel.write,另一个是ctx.write,这两个有啥区别呢?为啥输出结果是三条:ctx.channel().write -> hello world,一条hello world呢?
278+
279+
启动Server启动类之后,再cmd窗口输入连接socket的命令debug之后分析得
280+
281+
```
282+
telnet 127.0.0.1 8888
283+
```
284+
285+
在客户端socket连接进Netty之后,会先注册channel并init初始化,这时会调用Server类里ServerBootstrap注入的ChannelInitilizer的initChannel方法,最终得以向ChannelPipeline里添加进OutBoundHandlerA、OutBoundHandlerB、OutBoundHandlerC,随后调用
286+
287+
```java
288+
ch.pipeline().addLast(new xxxx)
289+
```
290+
只有会触发DefaultChannelPipeline#callHandlerAdded0()方法,最终来到OutBoundHandler里的handlerAdded()方法,并向Netty的定时任务队列里添加了一个匿名内部任务,也就是:
291+
292+
```java
293+
@Override
294+
public void handlerAdded(final ChannelHandlerContext ctx) {
295+
ctx.executor().schedule(() -> {
296+
ctx.channel().write("ctx.channel().write -> hello world");
297+
ctx.write("hello world");
298+
}, 3, TimeUnit.SECONDS);
299+
}
300+
```
301+
302+
随后完成客户端Socket的初始化工作。此时服务端的selector继续执行for死循环,执行到任务队列,此时发现任务队列中有一个定时任务需要执行,则拿出任务并执行任务,执行过程会跳转到上面的匿名内部类,并依次执行ctx.channel().write()和ctx.write()两个方法。
303+
304+
```java
305+
ctx.channel().write()
306+
```
307+
方法会从ChannelPipeline的尾部tail开始执行(上文已经总结过,outboundHandler都是从tail节点开始执行handler) ,所以字符串“ctx.channel().write -> hello world”就会按outboundHandlerC、outboundHandlerB、outboundHandlerC这个顺序开始执行,执行完head节点之后会一路往上返回到Ctx.channel().write()
308+
方法,并最后去执行ctx.write()方法,而ctx.write()方法会从当前的handler节点开始向前执行,所以当前outboundHandlerB的前节点是outboundHandlerA,所以最终控制台打印出:
309+
```
310+
OutBoundHandlerC: ctx.channel().write -> hello world
311+
OutBoundHandlerB: ctx.channel().write -> hello world
312+
OutBoundHandlerA: ctx.channel().write -> hello world
313+
OutBoundHandlerA: hello world
314+
```
315+
316+
整个过程比较复杂,也比较绕,下面用一张流程图来描述整个过程。
317+
318+
![NettyChannelPipeline流程图1](https://coderbruis.github.io/javaDocs/img/netty/source/NettyChannelPipeline流程图1.png)
319+
320+
321+
- TODO ChannelPipeline优化?MASK
322+
- TODO SimpleChannelInboundHandler源码分析

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.