Netty 是一個(gè)在 Java 生態(tài)里應(yīng)用非常廣泛的的網(wǎng)絡(luò)編程工具包,它在 2004 年誕生到現(xiàn)在依然是火的一塌糊涂,光在 github 上就有 30000 多個(gè)項(xiàng)目在用它。所以要想更好地掌握網(wǎng)絡(luò)編程,我想就繞不開(kāi) Netty。所以今天我們就來(lái)分析分析 Netty 內(nèi)部網(wǎng)絡(luò)模塊的工作原理。

友情提示,本文算上代碼將近有兩三萬(wàn)字,比較長(zhǎng),如果時(shí)間緊迫中間部分可以跳著看。第一節(jié)和最后的第六節(jié)建議必讀。當(dāng)然直接拖到尾部收藏點(diǎn)贊點(diǎn)轉(zhuǎn)發(fā),也是 ok 的,哈哈!

另外,今天又給大家申請(qǐng)到了贊助,在文末給大家申請(qǐng)了5本冰河的新書(shū)《深入理解高并發(fā)編程》,抽獎(jiǎng)送給大家。


(資料圖片僅供參考)

一、Netty 用法

我們首先找一個(gè) Netty 的例子,本篇文章整體都是圍繞這個(gè)例子來(lái)展開(kāi)敘述的。我們下載 Netty 的源碼,并在 examples 中找到 echo 這個(gè) demo。同時(shí),為了防止代碼更新導(dǎo)致對(duì)本文敘述的影響,我們切到 4.1 分支上來(lái)。

# git checkout https://github.com/netty/netty.git# git checkout -b 4.1# cd example/src/main/java/io/netty/example/echo

在這個(gè) demo 的 EchoServer 中,展示了使用 Netty 寫(xiě) Server 的經(jīng)典用法。(飛哥在文章中會(huì)在不影響核心邏輯的表達(dá)上,對(duì)原始代碼盡心適當(dāng)?shù)木?jiǎn),比如下面代碼中的 try 就被我丟了)

public final class EchoServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); ...... }}

如果你是一個(gè) Java 新手,或者干脆像飛哥一樣沒(méi)用 Netty 寫(xiě)過(guò)服務(wù),相信上述代碼基本是看不懂的。究其根本原因是相比 C/C++ ,Java 的封裝程度比較高。Java 語(yǔ)言本身的 JVM 中 NIO 對(duì)網(wǎng)絡(luò)的封裝就已經(jīng)屏蔽了很多底層的概念了,再加上 Netty 又封裝了一層,所以 Java 開(kāi)發(fā)者常用的一些術(shù)語(yǔ)和概念和其它語(yǔ)言出入很大。

比如上面代碼中的 Channel、NioEventLoopGroup 等都是其它語(yǔ)言中所沒(méi)見(jiàn)過(guò)的。不過(guò)你也不用感到害怕,因?yàn)檫@其中的每一個(gè)概念都是 socket、進(jìn)程等底層概念穿了一身不同的衣服而已。接下來(lái)我們分別細(xì)了解一下這些概念。

1.1 NioEventLoopGroup

如果你沒(méi)接觸過(guò) Netty,可以簡(jiǎn)單把 NioEventLoopGroup 理解為一個(gè)線程池就可以。每一個(gè) NioEventLoopGroup 內(nèi)部包含一個(gè)或者多個(gè) NioEventLoop。

其中 NioEventLoop 是對(duì)線程、epoll 等概念進(jìn)行了一個(gè)集中的封裝。

首先,EventLoop 本身就是一個(gè)線程。為什么這么說(shuō),我們通過(guò)看 NioEventLoop 的繼承關(guān)系就能看出來(lái)。NioEventLoop 繼承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又繼承于 SingleThreadEventExecutor。SingleThreadEventExecutor 實(shí)現(xiàn)了在 Netty 中對(duì)本地線程的抽象。

public abstract class SingleThreadEventExecutor extends ... { private volatile Thread thread; private final Queue taskQueue;}

在 SingleThreadEventExecutor 中不但封裝了線程對(duì)象 Thread,而且還配置了一個(gè)任務(wù)隊(duì)列 taskQueue,用于其它線程向它來(lái)放置待處理的任務(wù)。

1.2 selector

另外 NioEventLoopEventLoop 以 selector 的名義封裝了 epoll(在 Linux 操作系統(tǒng)下)。

在 NioEventLoop 對(duì)象內(nèi)部,會(huì)有 selector 成員定義。這其實(shí)就是封裝的 epoll 而來(lái)的。我們來(lái)看具體的封裝過(guò)程。以及 selectedKeys,這是從 selector 上發(fā)現(xiàn)的待處理的事件列表。

public final class NioEventLoop extends SingleThreadEventLoop{ // selector private Selector selector; private Selector unwrappedSelector; // selector 上發(fā)現(xiàn)的各種待處理事件 private SelectedSelectionKeySet selectedKeys;}

NioEventLoopGroup 在構(gòu)造的時(shí)候,會(huì)調(diào)用 SelectorProvider#provider 來(lái)生成 provider,在默認(rèn)情況下會(huì)調(diào)用 sun.nio.ch.DefaultSelectorProvider.create 來(lái)創(chuàng)建。

//file:java/nio/channels/spi/SelectorProvider.javapublic abstract class SelectorProvider { public static SelectorProvider provider() { // 1. java.nio.channels.spi.SelectorProvider 屬性指定實(shí)現(xiàn)類(lèi) // 2. SPI 指定實(shí)現(xiàn)類(lèi) ...... // 3. 默認(rèn)實(shí)現(xiàn),Windows 和 Linux 下不同 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; }}

在 Linux 下,默認(rèn)創(chuàng)建的 provider 使用的就是 epoll。

//file:sun/nio/ch/DefaultSelectorProvider.javapublic class DefaultSelectorProvider { public static SelectorProvider create() { String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); }}1.3 Channel

Channel 是 JavaNIO 里的一個(gè)概念。大家把它理解成 socket,以及在 socket 之上的一系列操作方法的封裝就可以了。

Java 在 Channel 中把 connect、bind、read、write 等方法都以成員方法的形式給封裝起來(lái)了。

public interface Channel extends ... { Channel read(); Channel flush(); ...... interface Unsafe { void bind(SocketAddress localAddress, ...); void connect(SocketAddress remoteAddress, ...); void write(Object msg, ...); ...... }}

另外在 Java 中,習(xí)慣把 listen socket 叫做父 channel,客戶端握手請(qǐng)求到達(dá)以后創(chuàng)建出來(lái)的新連接叫做子 channel,方便區(qū)分。

1.4 Pipeline

在每個(gè) Channel 對(duì)象的內(nèi)部,除了封裝了 socket 以外,還都一個(gè)特殊的數(shù)據(jù)結(jié)構(gòu) DefaultChannelPipeline pipeline。在這個(gè) pipeline 里是各種時(shí)機(jī)里注冊(cè)的 handler。

Channel 上的讀寫(xiě)操作都會(huì)走到這個(gè) DefaultChannelPipeline 中,當(dāng) channel 上完成 register、active、read、readComplete 等操作時(shí),會(huì)觸發(fā) pipeline 中的相應(yīng)方法。

這個(gè) ChannelPipeline 其實(shí)就是一個(gè)雙向鏈表,以及鏈表上的各式各樣的操作方法。

public interface ChannelPipeline { ChannelPipeline addFirst(String name, ChannelHandler handler); ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); ChannelPipeline addLast(String name, ChannelHandler handler); ChannelPipeline fireChannelRead(Object msg);}1.5 EchoServer 解讀

現(xiàn)在我們具備了對(duì) Java、對(duì) Netty 的初步理解以后,我們?cè)贂?huì)后來(lái)看一下開(kāi)篇提到的 EchoServer 源碼。

public final class EchoServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); ...... }}

在該代碼一開(kāi)頭,bossGroup = new NioEventLoopGroup(1)這一行是創(chuàng)建了一個(gè)只有一個(gè)線程的線程池。workerGroup = new NioEventLoopGroup又創(chuàng)建了 worker 線程池,沒(méi)有指定數(shù)量,Netty 內(nèi)部會(huì)根據(jù)當(dāng)前機(jī)器的 CPU 核數(shù)來(lái)靈活決定。

ServerBootstrap 這是一個(gè)腳手架類(lèi),是為了讓我們寫(xiě)起服務(wù)器程序來(lái)更方便一些。

b.group(bossGroup, workerGroup)這一行是將兩個(gè)線程池傳入,第一個(gè)作為 boss 只處理 accept 接收新的客戶端連接請(qǐng)求。第二個(gè)參數(shù)作為 worker 線程池,來(lái)處理連接上的請(qǐng)求接收、處理以及結(jié)果發(fā)送發(fā)送。

我們注意下 childHandler是傳入了一個(gè) ChannelInitializer,這是當(dāng)有新的客戶端連接到達(dá)時(shí)會(huì)回調(diào)的一個(gè)方法。在這個(gè)方法內(nèi)部,我們給這個(gè)新的 chaneel 的 pipeline 上添加了一個(gè)處理器 serverHandler,以便收到數(shù)據(jù)的時(shí)候執(zhí)行該處理器進(jìn)行請(qǐng)求處理。

上面的幾個(gè)方法都是定義,在b.bind方法中真正開(kāi)始啟動(dòng)服務(wù),創(chuàng)建父 channel(listen socket),創(chuàng)建 boss 線程。當(dāng)有新連接到達(dá)的時(shí)候 boss 線程再創(chuàng)建子 channel,為其 pipeline 添加處理器,并啟動(dòng) worker 線程來(lái)進(jìn)行處理。

二、Netty bootstrap 參數(shù)構(gòu)建

簡(jiǎn)言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在構(gòu)建 Netty Server 的各種參數(shù)。

2.1 group 設(shè)置

ServerBootstrap 和其父類(lèi) AbstractBootstrap 內(nèi)部分別定義了兩個(gè) EventLoopGroup group 成員。父類(lèi) AbstractBootstrap 的 group 是用來(lái)處理 accpet 事件的,ServerBootstrap 下的 childGroup 用來(lái)處理其它所有的讀寫(xiě)等事件。

group() 方法就是把 EventLoopGroup 參數(shù)設(shè)置到自己的成員上完事。其中如果調(diào)用 group() 只傳入了一個(gè)線程池,那么將來(lái)本服務(wù)下的所有事件都由這個(gè)線程池來(lái)處理。詳情查看飛哥精簡(jiǎn)后的源碼。

//file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap { //用來(lái)處理非 accept 以外的線程池 private volatile EventLoopGroup childGroup; public ServerBootstrap group(EventLoopGroup group) { return group(group, group); } public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; }}public abstract class AbstractBootstrap ... { //用來(lái)處理 accept 的線程 volatile EventLoopGroup group; public B group(EventLoopGroup group) { this.group = group; ...... }}2.2 channel 設(shè)置

再看 ServerBootstrap#channel 方法 是用來(lái)定義一個(gè)工廠方法,將來(lái)需要?jiǎng)?chuàng)建 channel 的時(shí)候都調(diào)用該工廠進(jìn)行創(chuàng)建。

//file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap { public B channel(Class channelClass) { return channelFactory(new ReflectiveChannelFactory( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }}

回頭看本文開(kāi)頭 demo,.channel(NioServerSocketChannel.class)指的是將來(lái)需要?jiǎng)?chuàng)建 channel 的時(shí)候,創(chuàng)建 NioServerSocketChannel 這個(gè)類(lèi)型的。

2.3 option 設(shè)置

再看 option 方法,只是設(shè)置到了 options 成員中而已

//file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap { public B option(ChannelOption option, T value) { ObjectUtil.checkNotNull(option, "option"); synchronized (options) { if (value == null) { options.remove(option); } else { options.put(option, value); } } return self(); }}2.4 handler 方法

本文 demo 設(shè)置了兩處 handler,一處是 handler,另一處是 childHandler。他們都是分別設(shè)置到自己的成員上就完事,看源碼。

//file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends ...... { public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); } public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; }}三、Netty bootstrap 啟動(dòng)服務(wù)

ServerBootstrap 下的 bind 方法是服務(wù)啟動(dòng)過(guò)程中非常重要的一個(gè)方法。創(chuàng)建父 channel(listen socket),創(chuàng)建 boss 線程,為 boss 線程綁定 Acceptor 處理器,調(diào)用系統(tǒng)調(diào)用 bind 進(jìn)行綁定和監(jiān)聽(tīng)都是在這里完成的。

先來(lái)直接看一下 bind 相關(guān)的入口源碼。

//file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap ... { ......}//file:io/netty/bootstrap/AbstractBootstrap.javapublic abstract class AbstractBootstrap ... { public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(...); } private ChannelFuture doBind(final SocketAddress localAddress) { //創(chuàng)建父 channel、初始化并且注冊(cè) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); ...... //如果 Register 已經(jīng)完成,則直接 doBind0 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; //否則就注冊(cè)一個(gè) listener(回調(diào)),等 register 完成的時(shí)候調(diào)用 } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } return promise; } } //創(chuàng)建 channel,對(duì)其初始化,并且 register(會(huì)創(chuàng)建 parent 線程) final ChannelFuture initAndRegister() { //3.1 創(chuàng)建父 channel(listen socket) channel = channelFactory.newChannel(); //3.2 對(duì)父 channel(listen socket)進(jìn)行初始化 init(channel); //3.3 注冊(cè)并啟動(dòng) boss 線程 ChannelFuture regFuture = config().group().register(channel); ...... } //3.4 真正的bind private static void doBind0(...) { channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ...... } }); }}

在這個(gè)過(guò)程中,做了如下幾件重要的事情

創(chuàng)建父 channel(listen socket)對(duì)父 channel(listen socket)進(jìn)行初始化register父 channel(listen socket)到主 group,并啟動(dòng)主進(jìn)程真正的 bind

接下來(lái)我們分開(kāi)來(lái)看。

3.1 創(chuàng)建父 channel(listen socket)

在 initAndRegister() 方法中創(chuàng)建 channel(socket),它調(diào)用了 channelFactory.newChannel()。

public abstract class AbstractBootstrap //創(chuàng)建 channel,對(duì)其初始化,并且 register(會(huì)創(chuàng)建 parent 線程) final ChannelFuture initAndRegister() { //3.1 創(chuàng)建 listen socket channel = channelFactory.newChannel(); ...... }}

回想下 2.2 節(jié)的channel 方法,返回的是一個(gè)反射 ReflectiveChannelFactory。沒(méi)錯(cuò)這里的 newChannel 就是調(diào)用這個(gè)工廠方法來(lái)創(chuàng)建出來(lái)一個(gè) NioServerSocketChannel 對(duì)象。

3.2 對(duì)父 channel(listen socket)進(jìn)行初始化

在 initAndRegister 創(chuàng)建除了 channel 之后,需要調(diào)用 init 對(duì)其進(jìn)行初始化。

public abstract class AbstractBootstrap final ChannelFuture initAndRegister() { //3.1 創(chuàng)建父 channel(listen socket) //3.2 對(duì)父 channel(listen socket)進(jìn)行初始化 init(channel); ...... }}

在 init() 中對(duì) channel 進(jìn)行初始化,一是給 options 和 attrs 賦值,二是構(gòu)建了父 channel 的 pipeline。

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap { void init(Channel channel) { //設(shè)置 option 和 attr setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, newAttributesArray()); //設(shè)置 pipeline ChannelPipeline p = channel.pipeline(); p.addLast(new ChannelInitializer() { ...... }); }}

在 setChannelOptions 中對(duì) channel 的各種 option 進(jìn)行設(shè)置。回憶我們?cè)谑褂?ServerBootstrap 時(shí)可以傳入 SO_BACKLOG,這就是其中的一個(gè) option。在這里會(huì)真正設(shè)置到 channel(socket)上。

ServerBootstrap b = new ServerBootstrap();b.option(ChannelOption.SO_BACKLOG, 100)

在 init 中,稍微難理解一點(diǎn)是 p.addLast(new ChannelInitializer...)。這一段代碼只是給父 channel 添加一個(gè) handler 而已。其真正的執(zhí)行要等到 register 后,我們待會(huì)再看。

3.3 register 父 channel

父 channel 在創(chuàng)建完,并且初始化之后,需要注冊(cè)到 boss 線程上才可用。

public abstract class AbstractBootstrap final ChannelFuture initAndRegister() { //3.1 創(chuàng)建父 channel(listen socket) //3.2 對(duì)父 channel(listen socket)進(jìn)行初始化 //3.3 注冊(cè)并啟動(dòng) boss 線程 ChannelFuture regFuture = config().group().register(channel); ...... }}

其中 config().group() 最終會(huì)調(diào)用到 AbstractBootstrap#group,在這個(gè)方法里獲取的是我們傳入進(jìn)來(lái)的 bossGroup。

public abstract class AbstractBootstrap volatile EventLoopGroup group; public final EventLoopGroup group() { return group; }}

其中 bossGroup 是一個(gè) NioEventLoopGroup 實(shí)例,所以代碼會(huì)進(jìn)入到 NioEventLoopGroup#register 方法。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {}public abstract class MultithreadEventLoopGroup extends ... { @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public EventLoop next() { return (EventLoop) super.next(); }}

在 NioEventLoopGroup 里包含一個(gè)或多個(gè) EventLoop。上面的 next 方法就是從中選擇一個(gè)出來(lái),然后將 channel 注冊(cè)到其上。

對(duì)于本文來(lái)講,我們使用的是 NioEventLoopGroup,其內(nèi)部包含的自然也就是 NioEventLoop,我們繼續(xù)查找其 register 方法。

public final class NioEventLoop extends SingleThreadEventLoop //在 eventloop 里注冊(cè)一個(gè) channle(socket) public void register(final SelectableChannel ch, ...) { ...... register0(ch, interestOps, task); } //最終調(diào)用 channel 的 register private void register0(SelectableChannel ch, int interestOps, NioTask task) { ch.register(unwrappedSelector, interestOps, task); }}

可見(jiàn),NioEventLoop 的 register 最后又調(diào)用到 channel 的 register 上了。在我們本文中,我們創(chuàng)建的 channel 是 NioServerSocketChannel,我們就依照這條線索來(lái)查。

//file:src/main/java/io/netty/channel/AbstractChannel.javapublic abstract class AbstractChannel extends DefaultAttributeMap implements Channel { public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... //關(guān)聯(lián)自己到 eventLoop AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } ...... } }}

在 channel 的父類(lèi) AbstractChannel 中的 register 中,先是把自己關(guān)聯(lián)到傳入的 eventLoop 上。接著調(diào)用 inEventLoop 來(lái)判斷線程當(dāng)前運(yùn)行的線程是否是 EventExecutor的支撐線程,是則返回直接 register0。

一般來(lái)說(shuō),服務(wù)在啟動(dòng)的時(shí)候都是主線程在運(yùn)行。這個(gè)時(shí)候很可能 boss 線程還沒(méi)有啟動(dòng)。所以如果發(fā)現(xiàn)當(dāng)前不是 boss 線程的話,就調(diào)用 eventLoop.execute 來(lái)啟動(dòng) boss 線程。

NioEventLoop 的父類(lèi)是 SingleThreadEventExecutor, 找到 execute 方法。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { public void execute(Runnable task) { execute0(task); } private void execute0(@Schedule Runnable task) { execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }}

我們先來(lái)看 addTask(task),它是將 task 添加到任務(wù)隊(duì)列中。等待線程起來(lái)以后再運(yùn)行。

public abstract class SingleThreadEventExecutor extends ... { private final Queue taskQueue; protected void addTask(Runnable task) { (task); } final boolean offerTask(Runnable task) { return taskQueue.offer(task); }}

inEventLoop() 是判斷當(dāng)前線程是不是自己綁定的線程,這時(shí)還在主線程中運(yùn)行,所以 inEventLoop 為假,會(huì)進(jìn)入 startThread 開(kāi)始為 EventLoop 創(chuàng)建線程。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { private void startThread() { doStartThread(); ...... } private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); ...... } } } }

在 doStartThread 中調(diào)用 Java 線程管理工具 Executor 來(lái)啟動(dòng) boss 線程。

3.4 boss 線程啟動(dòng)

當(dāng)線程起來(lái)以后就進(jìn)入了自己的線程循環(huán)中了,會(huì)遍歷自己的任務(wù)隊(duì)列,然后開(kāi)始處理自己的任務(wù)。

public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { if (!hasTasks()) { strategy = select(curDeadlineNanos); } //如果有任務(wù)的話就開(kāi)始處理 runAllTasks(0); //任務(wù)處理完畢就調(diào)用 epoll_wait 等待事件發(fā)生 processSelectedKeys(); } }}

前面我們?cè)?3.3 節(jié)看到 eventLoop.execute 把一個(gè) Runnable 任務(wù)添加到了任務(wù)隊(duì)列里。當(dāng) EventLoop 線程啟動(dòng)后,它會(huì)遍歷自己的任務(wù)隊(duì)列并開(kāi)始處理。這時(shí)會(huì)進(jìn)入到 AbstractChannel#register0 方法開(kāi)始運(yùn)行。

//file:src/main/java/io/netty/channel/AbstractChannel.javapublic abstract class AbstractChannel extends ... { public final void register(...) { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ...... } private void register0(ChannelPromise promise) { doRegister(); ...... }}

函數(shù) doRegister 是在 AbstractNioChannel 類(lèi)下。

//file:io/netty/channel/nio/AbstractNioChannel.javapublic abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected SelectableChannel javaChannel() { return ch; } public NioEventLoop eventLoop() { return (NioEventLoop) super.eventLoop(); } protected void doRegister() throws Exception { boolean selected = false; for (;;) { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); } }}

上面最關(guān)鍵的一句是selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。這一句就相當(dāng)于在 C 語(yǔ)言下調(diào)用 epoll_ctl 把 listen socket 添加到了 epoll 對(duì)象下。

其中 javaChannel 獲取父 channel,相當(dāng)于 listen socket。unwrappedSelector 獲取 selector,相當(dāng)于 epoll 對(duì)象。register 相當(dāng)于使用 epoll_ctl 執(zhí)行 add 操作。

當(dāng) channel 注冊(cè)完后,前面 init 時(shí)注冊(cè)的 ChannelInitializer 回調(diào)就會(huì)被執(zhí)行。再回頭看它的 回調(diào)定義。

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap { void init(Channel channel) ...... p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) { ...... ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }}

在 ChannelInitializer#initChannel 里,又給 boss 線程的 pipeline 里添加了一個(gè)任務(wù)。該任務(wù)是讓其在自己的 pipeline 上注冊(cè)一個(gè) ServerBootstrapAcceptor handler。將來(lái)有新連接到達(dá)的時(shí)候,ServerBootstrapAcceptor 將會(huì)被執(zhí)行。

3.5 真正的 bind

再看 doBind0 方法,調(diào)用 channel.bind 完成綁定。

private static void doBind0(...) { channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ...... } }); }四、新連接到達(dá)

我們?cè)倩氐?boss 線程的主循環(huán)中。

public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); //任務(wù)隊(duì)列都處理完就開(kāi)始 select if (!hasTasks()) { strategy = select(curDeadlineNanos); } //處理各種事件 if (strategy > 0) { processSelectedKeys(); } } } private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }}

假如線程任務(wù)隊(duì)列中的任務(wù)都處理干凈了的情況下,boss 線程會(huì)調(diào)用 select 來(lái)發(fā)現(xiàn)其 selector 上的各種事件。相當(dāng)于 C 語(yǔ)言中的 epoll_wait。

當(dāng)發(fā)現(xiàn)有事件發(fā)生的時(shí)候,例如 OP_WRITE、OP_ACCEPT、OP_READ 等的時(shí)候,會(huì)進(jìn)入相應(yīng)的處理

public final class NioEventLoop extends SingleThreadEventLoop { private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ...... if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }}

對(duì)于服務(wù)端的 Unsafe.read() 這里會(huì)執(zhí)行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法,它會(huì)調(diào)用 JDK 底層的 ServerSocketChannel.accept() 接收到客戶端的連接后,將其封裝成 Netty 的 NioSocketChannel,再通過(guò) Pipeline 將 ChannelRead 事件傳播出去,這樣 ServerBootstrapAcceptor 就可以在 ChannelRead 回調(diào)里處理新的客戶端連接了。

我們直接看 ServerBootstrapAcceptor#ChannelRead。

//file:public class ServerBootstrap extends AbstractBootstrap { ...... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { // 獲取child channel final Channel child = (Channel) msg; // 設(shè)置 childHandler 到 child channel child.pipeline().addLast(childHandler); // 設(shè)置 childOptions、 childAttrs setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); // 將 child channel 注冊(cè)到 childGroup childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } }}

在 channelRead 先是獲取到了新創(chuàng)建出來(lái)的子 channel,并為其 pipeline 添加 childHandler?;仡^看 1.5 節(jié),childHandler 是我們自定義的。

緊接著調(diào)用 childGroup.register(child) 將子 channel 注冊(cè)到 workerGroup 上。這個(gè) register 過(guò)程和 3.3節(jié)、3.5節(jié)過(guò)程一樣。區(qū)別就是前面是父 channel 注冊(cè)到 bossGroup 上,這里是子 channel 注冊(cè)到 workerGroup上。

在 register 完成后,子 channel 被掛到了 workerGroup 其中一個(gè)線程上,相應(yīng)的線程如果沒(méi)有創(chuàng)建也會(huì)被創(chuàng)建出來(lái)并進(jìn)入到自己的線程循環(huán)中。

當(dāng)子 channel 注冊(cè)完畢的時(shí)候,childHandler 中 ChannelInitializer#initChannel 會(huì)被執(zhí)行

public final class EchoServer { public static void main(String[] args) throws Exception { ... ServerBootstrap b = new ServerBootstrap(); b.childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); } }); ...... }}

在 initChannel 把子 channel 的處理類(lèi) serverHandler 添加上來(lái)了。Netty demo 中對(duì)這個(gè)處理類(lèi)的定義非常的簡(jiǎn)單,僅僅只是打印出來(lái)而已。

public class EchoServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(......) { ctx.write(msg); } ......}五、用戶請(qǐng)求到達(dá)

當(dāng) worker 線程起來(lái)以后,會(huì)進(jìn)入線程循環(huán)(boss 線程和 worker 線程的 run 函數(shù)是一個(gè))。在循環(huán)中會(huì)遍歷自己的任務(wù)隊(duì)列,如果沒(méi)有任務(wù)可處理,便 select 來(lái)觀察自己所負(fù)責(zé)的 channel 上是否有事件發(fā)生。

public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { if (!hasTasks()) { strategy = select(curDeadlineNanos); } //如果有任務(wù)的話就開(kāi)始處理 runAllTasks(0); //任務(wù)處理完畢就調(diào)用 epoll_wait 等待事件發(fā)生 processSelectedKeys(); } } private int select(long deadlineNanos) throws IOException { selector.selectNow(); ...... }}

worker 線程會(huì)調(diào)用 select 發(fā)現(xiàn)自己所管理的所有子 channel 上的可讀可寫(xiě)事件。在發(fā)現(xiàn)有可讀事件后,會(huì)調(diào)用 processSelectedKeys,最后觸發(fā) pipeline 使得 EchoServerHandler 方法開(kāi)始執(zhí)行。

public class EchoServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(......) { ctx.write(msg); } ......}六、總結(jié)

事實(shí)上,Netty 對(duì)網(wǎng)絡(luò)封裝的比較靈活。既支持單線程 Reactor,也支持多線程 Reactor、還支持主從多線程 Reactor。三種模型對(duì)應(yīng)的用法如下:

public static void main(String[] args) throws Exception { //單線程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(1); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); ...... //多線程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); ...... //主從多線程 Reactor EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); ......}

為了表述的更全面,本文飛哥選擇的是最為經(jīng)典的 主從多線程 Reactor 模式。本文中所描述的內(nèi)容可以用下面一幅圖來(lái)表示。

在 Netty 中的 boss 線程中負(fù)責(zé)對(duì)父 channel(listen socket)上事件的監(jiān)聽(tīng)和處理,當(dāng)有新連接到達(dá)的時(shí)候,選擇一個(gè) worker 線程把這個(gè)子 channel(連接 socket )叫給 worker 線程來(lái)處理。

其中 Worker 線程就是等待其管理的所有子 channel(連接 socket)上的事件的監(jiān)聽(tīng)和處理。當(dāng)發(fā)現(xiàn)有事件發(fā)生的時(shí)候,回調(diào)用戶設(shè)置的 handler 進(jìn)行處理。在本文的例子中,這個(gè)用戶 handler 就是 EchoServerHandler#channelRead。

至此,Netty 網(wǎng)絡(luò)模塊的工作核心原理咱們就介紹完了。飛哥一直“鼓吹”內(nèi)功的好處。只要你具備了堅(jiān)實(shí)的內(nèi)功,各種語(yǔ)言里看似風(fēng)牛馬不相及的東西,在底層實(shí)際上原理是想通的。我本人從來(lái)沒(méi)用 Java 開(kāi)發(fā)過(guò)服務(wù)器程序,更沒(méi)碰過(guò) Netty。但是當(dāng)你多epoll有了深入理解的時(shí)候,再看Netty也能很容易看懂,很快就能理解它的核心。這就是鍛煉內(nèi)功的好處!

標(biāo)簽: Netty