Netty 是一個(gè)在 Java 生態(tài)里應(yīng)用非常廣泛的的網(wǎng)絡(luò)編程工具包,它在 2004 年誕生到現(xiàn)在依然是火的一塌糊涂,光在 github 上就有 30000 多個(gè)項(xiàng)目在用它。所以要想更好地掌握網(wǎng)絡(luò)編程,我想就繞不開(kāi) Netty。所以今天我們就來(lái)分析分析 Netty 內(nèi)部網(wǎng)絡(luò)模塊的工作原理。
友情提示,本文算上代碼將近有兩三萬(wàn)字,比較長(zhǎng),如果時(shí)間緊迫中間部分可以跳著看。第一節(jié)和最后的第六節(jié)建議必讀。當(dāng)然直接拖到尾部收藏點(diǎn)贊點(diǎn)轉(zhuǎn)發(fā),也是 ok 的,哈哈!
另外,今天又給大家申請(qǐng)到了贊助,在文末給大家申請(qǐng)了5本冰河的新書(shū)《深入理解高并發(fā)編程》,抽獎(jiǎng)送給大家。
(資料圖片僅供參考)
我們首先找一個(gè) Netty 的例子,本篇文章整體都是圍繞這個(gè)例子來(lái)展開(kāi)敘述的。我們下載 Netty 的源碼,并在 examples 中找到 echo 這個(gè) demo。同時(shí),為了防止代碼更新導(dǎo)致對(duì)本文敘述的影響,我們切到 4.1 分支上來(lái)。
# git checkout https://github.com/netty/netty.git# git checkout -b 4.1# cd example/src/main/java/io/netty/example/echo
在這個(gè) demo 的 EchoServer 中,展示了使用 Netty 寫(xiě) Server 的經(jīng)典用法。(飛哥在文章中會(huì)在不影響核心邏輯的表達(dá)上,對(duì)原始代碼盡心適當(dāng)?shù)木?jiǎn),比如下面代碼中的 try 就被我丟了)
public final class EchoServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer 如果你是一個(gè) Java 新手,或者干脆像飛哥一樣沒(méi)用 Netty 寫(xiě)過(guò)服務(wù),相信上述代碼基本是看不懂的。究其根本原因是相比 C/C++ ,Java 的封裝程度比較高。Java 語(yǔ)言本身的 JVM 中 NIO 對(duì)網(wǎng)絡(luò)的封裝就已經(jīng)屏蔽了很多底層的概念了,再加上 Netty 又封裝了一層,所以 Java 開(kāi)發(fā)者常用的一些術(shù)語(yǔ)和概念和其它語(yǔ)言出入很大。 比如上面代碼中的 Channel、NioEventLoopGroup 等都是其它語(yǔ)言中所沒(méi)見(jiàn)過(guò)的。不過(guò)你也不用感到害怕,因?yàn)檫@其中的每一個(gè)概念都是 socket、進(jìn)程等底層概念穿了一身不同的衣服而已。接下來(lái)我們分別細(xì)了解一下這些概念。 如果你沒(méi)接觸過(guò) Netty,可以簡(jiǎn)單把 NioEventLoopGroup 理解為一個(gè)線程池就可以。每一個(gè) NioEventLoopGroup 內(nèi)部包含一個(gè)或者多個(gè) NioEventLoop。 其中 NioEventLoop 是對(duì)線程、epoll 等概念進(jìn)行了一個(gè)集中的封裝。 首先,EventLoop 本身就是一個(gè)線程。為什么這么說(shuō),我們通過(guò)看 NioEventLoop 的繼承關(guān)系就能看出來(lái)。NioEventLoop 繼承于 SingleThreadEventLoop,而 SingleThreadEventLoop 又繼承于 SingleThreadEventExecutor。SingleThreadEventExecutor 實(shí)現(xiàn)了在 Netty 中對(duì)本地線程的抽象。 public abstract class SingleThreadEventExecutor extends ... { private volatile Thread thread; private final Queue 在 SingleThreadEventExecutor 中不但封裝了線程對(duì)象 Thread,而且還配置了一個(gè)任務(wù)隊(duì)列 taskQueue,用于其它線程向它來(lái)放置待處理的任務(wù)。 另外 NioEventLoopEventLoop 以 selector 的名義封裝了 epoll(在 Linux 操作系統(tǒng)下)。 在 NioEventLoop 對(duì)象內(nèi)部,會(huì)有 selector 成員定義。這其實(shí)就是封裝的 epoll 而來(lái)的。我們來(lái)看具體的封裝過(guò)程。以及 selectedKeys,這是從 selector 上發(fā)現(xiàn)的待處理的事件列表。 public final class NioEventLoop extends SingleThreadEventLoop{ // selector private Selector selector; private Selector unwrappedSelector; // selector 上發(fā)現(xiàn)的各種待處理事件 private SelectedSelectionKeySet selectedKeys;} NioEventLoopGroup 在構(gòu)造的時(shí)候,會(huì)調(diào)用 SelectorProvider#provider 來(lái)生成 provider,在默認(rèn)情況下會(huì)調(diào)用 sun.nio.ch.DefaultSelectorProvider.create 來(lái)創(chuàng)建。 //file:java/nio/channels/spi/SelectorProvider.javapublic abstract class SelectorProvider { public static SelectorProvider provider() { // 1. java.nio.channels.spi.SelectorProvider 屬性指定實(shí)現(xiàn)類(lèi) // 2. SPI 指定實(shí)現(xiàn)類(lèi) ...... // 3. 默認(rèn)實(shí)現(xiàn),Windows 和 Linux 下不同 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; }} 在 Linux 下,默認(rèn)創(chuàng)建的 provider 使用的就是 epoll。 //file:sun/nio/ch/DefaultSelectorProvider.javapublic class DefaultSelectorProvider { public static SelectorProvider create() { String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); }}1.3 Channel Channel 是 JavaNIO 里的一個(gè)概念。大家把它理解成 socket,以及在 socket 之上的一系列操作方法的封裝就可以了。 Java 在 Channel 中把 connect、bind、read、write 等方法都以成員方法的形式給封裝起來(lái)了。 public interface Channel extends ... { Channel read(); Channel flush(); ...... interface Unsafe { void bind(SocketAddress localAddress, ...); void connect(SocketAddress remoteAddress, ...); void write(Object msg, ...); ...... }} 另外在 Java 中,習(xí)慣把 listen socket 叫做父 channel,客戶端握手請(qǐng)求到達(dá)以后創(chuàng)建出來(lái)的新連接叫做子 channel,方便區(qū)分。 在每個(gè) Channel 對(duì)象的內(nèi)部,除了封裝了 socket 以外,還都一個(gè)特殊的數(shù)據(jù)結(jié)構(gòu) DefaultChannelPipeline pipeline。在這個(gè) pipeline 里是各種時(shí)機(jī)里注冊(cè)的 handler。 Channel 上的讀寫(xiě)操作都會(huì)走到這個(gè) DefaultChannelPipeline 中,當(dāng) channel 上完成 register、active、read、readComplete 等操作時(shí),會(huì)觸發(fā) pipeline 中的相應(yīng)方法。 這個(gè) ChannelPipeline 其實(shí)就是一個(gè)雙向鏈表,以及鏈表上的各式各樣的操作方法。 public interface ChannelPipeline { ChannelPipeline addFirst(String name, ChannelHandler handler); ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); ChannelPipeline addLast(String name, ChannelHandler handler); ChannelPipeline fireChannelRead(Object msg);}1.5 EchoServer 解讀 現(xiàn)在我們具備了對(duì) Java、對(duì) Netty 的初步理解以后,我們?cè)贂?huì)后來(lái)看一下開(kāi)篇提到的 EchoServer 源碼。 public final class EchoServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer 在該代碼一開(kāi)頭,bossGroup = new NioEventLoopGroup(1)這一行是創(chuàng)建了一個(gè)只有一個(gè)線程的線程池。workerGroup = new NioEventLoopGroup又創(chuàng)建了 worker 線程池,沒(méi)有指定數(shù)量,Netty 內(nèi)部會(huì)根據(jù)當(dāng)前機(jī)器的 CPU 核數(shù)來(lái)靈活決定。 ServerBootstrap 這是一個(gè)腳手架類(lèi),是為了讓我們寫(xiě)起服務(wù)器程序來(lái)更方便一些。 b.group(bossGroup, workerGroup)這一行是將兩個(gè)線程池傳入,第一個(gè)作為 boss 只處理 accept 接收新的客戶端連接請(qǐng)求。第二個(gè)參數(shù)作為 worker 線程池,來(lái)處理連接上的請(qǐng)求接收、處理以及結(jié)果發(fā)送發(fā)送。 我們注意下 childHandler是傳入了一個(gè) ChannelInitializer,這是當(dāng)有新的客戶端連接到達(dá)時(shí)會(huì)回調(diào)的一個(gè)方法。在這個(gè)方法內(nèi)部,我們給這個(gè)新的 chaneel 的 pipeline 上添加了一個(gè)處理器 serverHandler,以便收到數(shù)據(jù)的時(shí)候執(zhí)行該處理器進(jìn)行請(qǐng)求處理。 上面的幾個(gè)方法都是定義,在b.bind方法中真正開(kāi)始啟動(dòng)服務(wù),創(chuàng)建父 channel(listen socket),創(chuàng)建 boss 線程。當(dāng)有新連接到達(dá)的時(shí)候 boss 線程再創(chuàng)建子 channel,為其 pipeline 添加處理器,并啟動(dòng) worker 線程來(lái)進(jìn)行處理。 簡(jiǎn)言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在構(gòu)建 Netty Server 的各種參數(shù)。 ServerBootstrap 和其父類(lèi) AbstractBootstrap 內(nèi)部分別定義了兩個(gè) EventLoopGroup group 成員。父類(lèi) AbstractBootstrap 的 group 是用來(lái)處理 accpet 事件的,ServerBootstrap 下的 childGroup 用來(lái)處理其它所有的讀寫(xiě)等事件。 group() 方法就是把 EventLoopGroup 參數(shù)設(shè)置到自己的成員上完事。其中如果調(diào)用 group() 只傳入了一個(gè)線程池,那么將來(lái)本服務(wù)下的所有事件都由這個(gè)線程池來(lái)處理。詳情查看飛哥精簡(jiǎn)后的源碼。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 再看 ServerBootstrap#channel 方法 是用來(lái)定義一個(gè)工廠方法,將來(lái)需要?jiǎng)?chuàng)建 channel 的時(shí)候都調(diào)用該工廠進(jìn)行創(chuàng)建。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 回頭看本文開(kāi)頭 demo,.channel(NioServerSocketChannel.class)指的是將來(lái)需要?jiǎng)?chuàng)建 channel 的時(shí)候,創(chuàng)建 NioServerSocketChannel 這個(gè)類(lèi)型的。 再看 option 方法,只是設(shè)置到了 options 成員中而已 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 本文 demo 設(shè)置了兩處 handler,一處是 handler,另一處是 childHandler。他們都是分別設(shè)置到自己的成員上就完事,看源碼。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends ...... { public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); } public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; }}三、Netty bootstrap 啟動(dòng)服務(wù) ServerBootstrap 下的 bind 方法是服務(wù)啟動(dòng)過(guò)程中非常重要的一個(gè)方法。創(chuàng)建父 channel(listen socket),創(chuàng)建 boss 線程,為 boss 線程綁定 Acceptor 處理器,調(diào)用系統(tǒng)調(diào)用 bind 進(jìn)行綁定和監(jiān)聽(tīng)都是在這里完成的。 先來(lái)直接看一下 bind 相關(guān)的入口源碼。 //file:io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap ... { ......}//file:io/netty/bootstrap/AbstractBootstrap.javapublic abstract class AbstractBootstrap ... { public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(...); } private ChannelFuture doBind(final SocketAddress localAddress) { //創(chuàng)建父 channel、初始化并且注冊(cè) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); ...... //如果 Register 已經(jīng)完成,則直接 doBind0 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; //否則就注冊(cè)一個(gè) listener(回調(diào)),等 register 完成的時(shí)候調(diào)用 } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } return promise; } } //創(chuàng)建 channel,對(duì)其初始化,并且 register(會(huì)創(chuàng)建 parent 線程) final ChannelFuture initAndRegister() { //3.1 創(chuàng)建父 channel(listen socket) channel = channelFactory.newChannel(); //3.2 對(duì)父 channel(listen socket)進(jìn)行初始化 init(channel); //3.3 注冊(cè)并啟動(dòng) boss 線程 ChannelFuture regFuture = config().group().register(channel); ...... } //3.4 真正的bind private static void doBind0(...) { channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ...... } }); }} 在這個(gè)過(guò)程中,做了如下幾件重要的事情 接下來(lái)我們分開(kāi)來(lái)看。 在 initAndRegister() 方法中創(chuàng)建 channel(socket),它調(diào)用了 channelFactory.newChannel()。 public abstract class AbstractBootstrap //創(chuàng)建 channel,對(duì)其初始化,并且 register(會(huì)創(chuàng)建 parent 線程) final ChannelFuture initAndRegister() { //3.1 創(chuàng)建 listen socket channel = channelFactory.newChannel(); ...... }} 回想下 2.2 節(jié)的channel 方法,返回的是一個(gè)反射 ReflectiveChannelFactory。沒(méi)錯(cuò)這里的 newChannel 就是調(diào)用這個(gè)工廠方法來(lái)創(chuàng)建出來(lái)一個(gè) NioServerSocketChannel 對(duì)象。 在 initAndRegister 創(chuàng)建除了 channel 之后,需要調(diào)用 init 對(duì)其進(jìn)行初始化。 public abstract class AbstractBootstrap final ChannelFuture initAndRegister() { //3.1 創(chuàng)建父 channel(listen socket) //3.2 對(duì)父 channel(listen socket)進(jìn)行初始化 init(channel); ...... }} 在 init() 中對(duì) channel 進(jìn)行初始化,一是給 options 和 attrs 賦值,二是構(gòu)建了父 channel 的 pipeline。 //file:src/main/java/io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 在 setChannelOptions 中對(duì) channel 的各種 option 進(jìn)行設(shè)置。回憶我們?cè)谑褂?ServerBootstrap 時(shí)可以傳入 SO_BACKLOG,這就是其中的一個(gè) option。在這里會(huì)真正設(shè)置到 channel(socket)上。 ServerBootstrap b = new ServerBootstrap();b.option(ChannelOption.SO_BACKLOG, 100) 在 init 中,稍微難理解一點(diǎn)是 p.addLast(new ChannelInitializer...)。這一段代碼只是給父 channel 添加一個(gè) handler 而已。其真正的執(zhí)行要等到 register 后,我們待會(huì)再看。 父 channel 在創(chuàng)建完,并且初始化之后,需要注冊(cè)到 boss 線程上才可用。 public abstract class AbstractBootstrap final ChannelFuture initAndRegister() { //3.1 創(chuàng)建父 channel(listen socket) //3.2 對(duì)父 channel(listen socket)進(jìn)行初始化 //3.3 注冊(cè)并啟動(dòng) boss 線程 ChannelFuture regFuture = config().group().register(channel); ...... }} 其中 config().group() 最終會(huì)調(diào)用到 AbstractBootstrap#group,在這個(gè)方法里獲取的是我們傳入進(jìn)來(lái)的 bossGroup。 public abstract class AbstractBootstrap volatile EventLoopGroup group; public final EventLoopGroup group() { return group; }} 其中 bossGroup 是一個(gè) NioEventLoopGroup 實(shí)例,所以代碼會(huì)進(jìn)入到 NioEventLoopGroup#register 方法。 public class NioEventLoopGroup extends MultithreadEventLoopGroup {}public abstract class MultithreadEventLoopGroup extends ... { @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public EventLoop next() { return (EventLoop) super.next(); }} 在 NioEventLoopGroup 里包含一個(gè)或多個(gè) EventLoop。上面的 next 方法就是從中選擇一個(gè)出來(lái),然后將 channel 注冊(cè)到其上。 對(duì)于本文來(lái)講,我們使用的是 NioEventLoopGroup,其內(nèi)部包含的自然也就是 NioEventLoop,我們繼續(xù)查找其 register 方法。 public final class NioEventLoop extends SingleThreadEventLoop //在 eventloop 里注冊(cè)一個(gè) channle(socket) public void register(final SelectableChannel ch, ...) { ...... register0(ch, interestOps, task); } //最終調(diào)用 channel 的 register private void register0(SelectableChannel ch, int interestOps, NioTask> task) { ch.register(unwrappedSelector, interestOps, task); }} 可見(jiàn),NioEventLoop 的 register 最后又調(diào)用到 channel 的 register 上了。在我們本文中,我們創(chuàng)建的 channel 是 NioServerSocketChannel,我們就依照這條線索來(lái)查。 //file:src/main/java/io/netty/channel/AbstractChannel.javapublic abstract class AbstractChannel extends DefaultAttributeMap implements Channel { public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... //關(guān)聯(lián)自己到 eventLoop AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } ...... } }} 在 channel 的父類(lèi) AbstractChannel 中的 register 中,先是把自己關(guān)聯(lián)到傳入的 eventLoop 上。接著調(diào)用 inEventLoop 來(lái)判斷線程當(dāng)前運(yùn)行的線程是否是 EventExecutor的支撐線程,是則返回直接 register0。 一般來(lái)說(shuō),服務(wù)在啟動(dòng)的時(shí)候都是主線程在運(yùn)行。這個(gè)時(shí)候很可能 boss 線程還沒(méi)有啟動(dòng)。所以如果發(fā)現(xiàn)當(dāng)前不是 boss 線程的話,就調(diào)用 eventLoop.execute 來(lái)啟動(dòng) boss 線程。 NioEventLoop 的父類(lèi)是 SingleThreadEventExecutor, 找到 execute 方法。 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { public void execute(Runnable task) { execute0(task); } private void execute0(@Schedule Runnable task) { execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }} 我們先來(lái)看 addTask(task),它是將 task 添加到任務(wù)隊(duì)列中。等待線程起來(lái)以后再運(yùn)行。 public abstract class SingleThreadEventExecutor extends ... { private final Queue inEventLoop() 是判斷當(dāng)前線程是不是自己綁定的線程,這時(shí)還在主線程中運(yùn)行,所以 inEventLoop 為假,會(huì)進(jìn)入 startThread 開(kāi)始為 EventLoop 創(chuàng)建線程。 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { private void startThread() { doStartThread(); ...... } private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); ...... } } } } 在 doStartThread 中調(diào)用 Java 線程管理工具 Executor 來(lái)啟動(dòng) boss 線程。 當(dāng)線程起來(lái)以后就進(jìn)入了自己的線程循環(huán)中了,會(huì)遍歷自己的任務(wù)隊(duì)列,然后開(kāi)始處理自己的任務(wù)。 public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { if (!hasTasks()) { strategy = select(curDeadlineNanos); } //如果有任務(wù)的話就開(kāi)始處理 runAllTasks(0); //任務(wù)處理完畢就調(diào)用 epoll_wait 等待事件發(fā)生 processSelectedKeys(); } }} 前面我們?cè)?3.3 節(jié)看到 eventLoop.execute 把一個(gè) Runnable 任務(wù)添加到了任務(wù)隊(duì)列里。當(dāng) EventLoop 線程啟動(dòng)后,它會(huì)遍歷自己的任務(wù)隊(duì)列并開(kāi)始處理。這時(shí)會(huì)進(jìn)入到 AbstractChannel#register0 方法開(kāi)始運(yùn)行。 //file:src/main/java/io/netty/channel/AbstractChannel.javapublic abstract class AbstractChannel extends ... { public final void register(...) { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ...... } private void register0(ChannelPromise promise) { doRegister(); ...... }} 函數(shù) doRegister 是在 AbstractNioChannel 類(lèi)下。 //file:io/netty/channel/nio/AbstractNioChannel.javapublic abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected SelectableChannel javaChannel() { return ch; } public NioEventLoop eventLoop() { return (NioEventLoop) super.eventLoop(); } protected void doRegister() throws Exception { boolean selected = false; for (;;) { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); } }} 上面最關(guān)鍵的一句是selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。這一句就相當(dāng)于在 C 語(yǔ)言下調(diào)用 epoll_ctl 把 listen socket 添加到了 epoll 對(duì)象下。 其中 javaChannel 獲取父 channel,相當(dāng)于 listen socket。unwrappedSelector 獲取 selector,相當(dāng)于 epoll 對(duì)象。register 相當(dāng)于使用 epoll_ctl 執(zhí)行 add 操作。 當(dāng) channel 注冊(cè)完后,前面 init 時(shí)注冊(cè)的 ChannelInitializer 回調(diào)就會(huì)被執(zhí)行。再回頭看它的 回調(diào)定義。 //file:src/main/java/io/netty/bootstrap/ServerBootstrap.javapublic class ServerBootstrap extends AbstractBootstrap 在 ChannelInitializer#initChannel 里,又給 boss 線程的 pipeline 里添加了一個(gè)任務(wù)。該任務(wù)是讓其在自己的 pipeline 上注冊(cè)一個(gè) ServerBootstrapAcceptor handler。將來(lái)有新連接到達(dá)的時(shí)候,ServerBootstrapAcceptor 將會(huì)被執(zhí)行。 再看 doBind0 方法,調(diào)用 channel.bind 完成綁定。 private static void doBind0(...) { channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ...... } }); }四、新連接到達(dá) 我們?cè)倩氐?boss 線程的主循環(huán)中。 public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); //任務(wù)隊(duì)列都處理完就開(kāi)始 select if (!hasTasks()) { strategy = select(curDeadlineNanos); } //處理各種事件 if (strategy > 0) { processSelectedKeys(); } } } private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }} 假如線程任務(wù)隊(duì)列中的任務(wù)都處理干凈了的情況下,boss 線程會(huì)調(diào)用 select 來(lái)發(fā)現(xiàn)其 selector 上的各種事件。相當(dāng)于 C 語(yǔ)言中的 epoll_wait。 當(dāng)發(fā)現(xiàn)有事件發(fā)生的時(shí)候,例如 OP_WRITE、OP_ACCEPT、OP_READ 等的時(shí)候,會(huì)進(jìn)入相應(yīng)的處理 public final class NioEventLoop extends SingleThreadEventLoop { private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ...... if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }} 對(duì)于服務(wù)端的 Unsafe.read() 這里會(huì)執(zhí)行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法,它會(huì)調(diào)用 JDK 底層的 ServerSocketChannel.accept() 接收到客戶端的連接后,將其封裝成 Netty 的 NioSocketChannel,再通過(guò) Pipeline 將 ChannelRead 事件傳播出去,這樣 ServerBootstrapAcceptor 就可以在 ChannelRead 回調(diào)里處理新的客戶端連接了。 我們直接看 ServerBootstrapAcceptor#ChannelRead。 //file:public class ServerBootstrap extends AbstractBootstrap 在 channelRead 先是獲取到了新創(chuàng)建出來(lái)的子 channel,并為其 pipeline 添加 childHandler?;仡^看 1.5 節(jié),childHandler 是我們自定義的。 緊接著調(diào)用 childGroup.register(child) 將子 channel 注冊(cè)到 workerGroup 上。這個(gè) register 過(guò)程和 3.3節(jié)、3.5節(jié)過(guò)程一樣。區(qū)別就是前面是父 channel 注冊(cè)到 bossGroup 上,這里是子 channel 注冊(cè)到 workerGroup上。 在 register 完成后,子 channel 被掛到了 workerGroup 其中一個(gè)線程上,相應(yīng)的線程如果沒(méi)有創(chuàng)建也會(huì)被創(chuàng)建出來(lái)并進(jìn)入到自己的線程循環(huán)中。 當(dāng)子 channel 注冊(cè)完畢的時(shí)候,childHandler 中 ChannelInitializer#initChannel 會(huì)被執(zhí)行 public final class EchoServer { public static void main(String[] args) throws Exception { ... ServerBootstrap b = new ServerBootstrap(); b.childHandler(new ChannelInitializer 在 initChannel 把子 channel 的處理類(lèi) serverHandler 添加上來(lái)了。Netty demo 中對(duì)這個(gè)處理類(lèi)的定義非常的簡(jiǎn)單,僅僅只是打印出來(lái)而已。 public class EchoServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(......) { ctx.write(msg); } ......}五、用戶請(qǐng)求到達(dá) 當(dāng) worker 線程起來(lái)以后,會(huì)進(jìn)入線程循環(huán)(boss 線程和 worker 線程的 run 函數(shù)是一個(gè))。在循環(huán)中會(huì)遍歷自己的任務(wù)隊(duì)列,如果沒(méi)有任務(wù)可處理,便 select 來(lái)觀察自己所負(fù)責(zé)的 channel 上是否有事件發(fā)生。 public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { if (!hasTasks()) { strategy = select(curDeadlineNanos); } //如果有任務(wù)的話就開(kāi)始處理 runAllTasks(0); //任務(wù)處理完畢就調(diào)用 epoll_wait 等待事件發(fā)生 processSelectedKeys(); } } private int select(long deadlineNanos) throws IOException { selector.selectNow(); ...... }} worker 線程會(huì)調(diào)用 select 發(fā)現(xiàn)自己所管理的所有子 channel 上的可讀可寫(xiě)事件。在發(fā)現(xiàn)有可讀事件后,會(huì)調(diào)用 processSelectedKeys,最后觸發(fā) pipeline 使得 EchoServerHandler 方法開(kāi)始執(zhí)行。 public class EchoServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(......) { ctx.write(msg); } ......}六、總結(jié) 事實(shí)上,Netty 對(duì)網(wǎng)絡(luò)封裝的比較靈活。既支持單線程 Reactor,也支持多線程 Reactor、還支持主從多線程 Reactor。三種模型對(duì)應(yīng)的用法如下: public static void main(String[] args) throws Exception { //單線程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(1); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); ...... //多線程 Reactor EventLoopGroup eventGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventGroup); ...... //主從多線程 Reactor EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); ......} 為了表述的更全面,本文飛哥選擇的是最為經(jīng)典的 主從多線程 Reactor 模式。本文中所描述的內(nèi)容可以用下面一幅圖來(lái)表示。 在 Netty 中的 boss 線程中負(fù)責(zé)對(duì)父 channel(listen socket)上事件的監(jiān)聽(tīng)和處理,當(dāng)有新連接到達(dá)的時(shí)候,選擇一個(gè) worker 線程把這個(gè)子 channel(連接 socket )叫給 worker 線程來(lái)處理。 其中 Worker 線程就是等待其管理的所有子 channel(連接 socket)上的事件的監(jiān)聽(tīng)和處理。當(dāng)發(fā)現(xiàn)有事件發(fā)生的時(shí)候,回調(diào)用戶設(shè)置的 handler 進(jìn)行處理。在本文的例子中,這個(gè)用戶 handler 就是 EchoServerHandler#channelRead。 至此,Netty 網(wǎng)絡(luò)模塊的工作核心原理咱們就介紹完了。飛哥一直“鼓吹”內(nèi)功的好處。只要你具備了堅(jiān)實(shí)的內(nèi)功,各種語(yǔ)言里看似風(fēng)牛馬不相及的東西,在底層實(shí)際上原理是想通的。我本人從來(lái)沒(méi)用 Java 開(kāi)發(fā)過(guò)服務(wù)器程序,更沒(méi)碰過(guò) Netty。但是當(dāng)你多epoll有了深入理解的時(shí)候,再看Netty也能很容易看懂,很快就能理解它的核心。這就是鍛煉內(nèi)功的好處! 標(biāo)簽:
Netty
- 每日熱點(diǎn):剖析Netty內(nèi)部網(wǎng)絡(luò)實(shí)現(xiàn)原理
- 天天快訊:HTTP/3來(lái)了!存續(xù)二十多年的TCP協(xié)議最終被拋棄!
- 雙重黑科技加持 諸多細(xì)節(jié)設(shè)計(jì)讓ROG魔霸6Plus戰(zhàn)力十足
- 觀熱點(diǎn):SD-WAN的自動(dòng)化以及為什么需要WAN加速
- 天天熱推薦:圖解網(wǎng)絡(luò):訪問(wèn)控制列表 ACL,功能堪比防火墻
- 訊息:面試突擊:為什么要用HTTPS?它有什么用?
- 最新:SDX的成敗幾何?
- 12.34秒!飛騰騰銳D2000實(shí)現(xiàn)國(guó)產(chǎn)筆記本電腦開(kāi)機(jī)新速度
- 每日精選:數(shù)字孿生,能給無(wú)線通信帶來(lái)什么?
- 微速訊:信令分析:KDDI重大故障為何持續(xù)60小時(shí)之久?