version: 4.1.55.Final
(相關(guān)資料圖)
傳統(tǒng)的IO模型的web容器,比如老版本的Tomcat,為了增加系統(tǒng)的吞吐量,需要不斷增加系統(tǒng)核心線程數(shù)量,或者通過(guò)水平擴(kuò)展服務(wù)器數(shù)量,來(lái)增加系統(tǒng)處理請(qǐng)求的能力。 有了NIO之后,一個(gè)線程即可處理多個(gè)連接事件,基于多路復(fù)用模型的Netty框架,不僅降低了使用NIO的復(fù)雜度,
優(yōu)點(diǎn)Netty是一款以java NIO為基礎(chǔ),基于事件驅(qū)動(dòng)模型支持異步、高并發(fā)的網(wǎng)絡(luò)應(yīng)用框架
API使用簡(jiǎn)單,開(kāi)發(fā)門(mén)檻低,簡(jiǎn)化了NIO開(kāi)發(fā)網(wǎng)絡(luò)程序的復(fù)雜度功能強(qiáng)大,預(yù)置多種編解碼功能,支持多種主流協(xié)議,比如Http、WebSocket。定制能力強(qiáng),可以通過(guò)ChannelHandler對(duì)通信框架靈活擴(kuò)展。性能高,支持異步非阻塞通信模型成熟穩(wěn)定,社區(qū)活躍,已經(jīng)修復(fù)了Java NIO所有的Bug。經(jīng)歷了大規(guī)模商業(yè)應(yīng)用的考驗(yàn),質(zhì)量有保證。IO模型select、poll和epoll
操作系統(tǒng)內(nèi)核基于這些函數(shù)實(shí)現(xiàn)非阻塞IO,以此實(shí)現(xiàn)多路復(fù)用模型
selectselect
select 調(diào)用需要傳入 fd 數(shù)組,需要拷貝一份到內(nèi)核,高并發(fā)場(chǎng)景下這樣的拷貝消耗的資源是驚人的。(可優(yōu)化為不復(fù)制)select 在內(nèi)核層仍然是通過(guò)遍歷的方式檢查文件描述符的就緒狀態(tài),是個(gè)同步過(guò)程,只不過(guò)無(wú)系統(tǒng)調(diào)用切換上下文的開(kāi)銷(xiāo)。(內(nèi)核層可優(yōu)化為異步事件通知)select 僅僅返回可讀文件描述符的個(gè)數(shù),具體哪個(gè)可讀還是要用戶(hù)自己遍歷。(可優(yōu)化為只返回給用戶(hù)就緒的文件描述符,無(wú)需用戶(hù)做無(wú)效的遍歷)pool和 select 的主要區(qū)別就是,去掉了 select 只能監(jiān)聽(tīng) 1024 個(gè)文件描述符的限制
epoolepool
內(nèi)核中保存一份文件描述符集合,無(wú)需用戶(hù)每次都重新傳入,只需告訴內(nèi)核修改的部分即可。內(nèi)核不再通過(guò)輪詢(xún)的方式找到就緒的文件描述符,而是通過(guò)異步 IO 事件喚醒。內(nèi)核僅會(huì)將有 IO 事件的文件描述符返回給用戶(hù),用戶(hù)也無(wú)需遍歷整個(gè)文件描述符集合。Reactor模型一、單Reactor單線程1)可以實(shí)現(xiàn)通過(guò)一個(gè)阻塞對(duì)象監(jiān)聽(tīng)多個(gè)鏈接請(qǐng)求
2)Reactor對(duì)象通過(guò)select監(jiān)聽(tīng)客戶(hù)端請(qǐng)求事件,通過(guò)dispatch進(jìn)行分發(fā)
3)如果是建立鏈接請(qǐng)求,則由Acceptor通過(guò)accept處理鏈接請(qǐng)求,然后創(chuàng)建一個(gè)Handler對(duì)象處理完成鏈接后的各種事件
4)如果不是鏈接請(qǐng)求,則由Reactor分發(fā)調(diào)用鏈接對(duì)應(yīng)的Handler來(lái)處理
5)Handler會(huì)完成Read->業(yè)務(wù)處理->send的完整業(yè)務(wù)流程
二、單Reactor多線程1)Reactor對(duì)象通過(guò)select監(jiān)聽(tīng)客戶(hù)端請(qǐng)求事件,收到事件后,通過(guò)dispatch分發(fā)
2)如果是建立鏈接請(qǐng)求,則由Acceptor通過(guò)accept處理鏈接請(qǐng)求,然后創(chuàng)建一個(gè)Handler對(duì)象處理完成鏈接后的各種事件
3)如果不是鏈接請(qǐng)求,則由Reactor分發(fā)調(diào)用鏈接對(duì)應(yīng)的Handler來(lái)處理
4)Handler只負(fù)責(zé)事件響應(yīng)不做具體業(yè)務(wù)處理
5)通過(guò)read讀取數(shù)據(jù)后,分發(fā)到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過(guò)send將結(jié)果返回給client
三、主從Reactor多線程1)Reactor主線程MainReactor對(duì)象通過(guò)select監(jiān)聽(tīng)鏈接事件,通過(guò)Acceptor處理
2)當(dāng)Acceptor處理鏈接事件后,MainReactor將鏈接分配給SubReactor
3)SubReactor將鏈接加入到隊(duì)列進(jìn)行監(jiān)聽(tīng),并創(chuàng)建Handler進(jìn)行事件處理
4)當(dāng)有新事件發(fā)生時(shí),SubReactor就會(huì)調(diào)用對(duì)應(yīng)的Handler處理
5)Handler通過(guò)read讀取數(shù)據(jù),分發(fā)到worker線程池處理,處理完成后返回給Handler,Handler收到后,通過(guò)send將結(jié)果返回給client
6)Reactor主線程可以對(duì)應(yīng)多個(gè)Reactor子線程
三種模式用生活案例來(lái)理解1)單Reactor單線程,前臺(tái)接待員和服務(wù)員是同一個(gè)人,全程為顧客服務(wù)
2)單Reactor多線程,1個(gè)前臺(tái)接待員,多個(gè)服務(wù)員,接待員只負(fù)責(zé)接待
3)主從Reactor多線程,多個(gè)前臺(tái)接待員,多個(gè)服務(wù)員
Reactor模型具有如下優(yōu)點(diǎn)1)響應(yīng)快,不必為單個(gè)同步事件所阻塞,雖然Reactor本身依然是同步的
2)可以最大程度的避免復(fù)雜的多線程及同步問(wèn)題,并且避免了多線程/進(jìn)程的切換開(kāi)銷(xiāo)
3)擴(kuò)展性好,可以方便的通過(guò)增加Reactor實(shí)例個(gè)數(shù)來(lái)充分利用CPU資源
4)復(fù)用性好,Reactor模型本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性
核心組件1.Bootstrap 一個(gè)Netty應(yīng)用通常由一個(gè)Bootstrap開(kāi)始,它主要作用是配置整個(gè)Netty程序,串聯(lián)起各個(gè)組件。
Handler,為了支持各種協(xié)議和處理數(shù)據(jù)的方式,便誕生了Handler組件。Handler主要用來(lái)處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。
2.ChannelInboundHandler 一個(gè)最常用的Handler。這個(gè)Handler的作用就是處理接收到數(shù)據(jù)時(shí)的事件,也就是說(shuō),我們的業(yè)務(wù)邏輯一般就是寫(xiě)在這個(gè)Handler里面的,ChannelInboundHandler就是用來(lái)處理我們的核心業(yè)務(wù)邏輯。
3.ChannelInitializer 當(dāng)一個(gè)鏈接建立時(shí),我們需要知道怎么來(lái)接收或者發(fā)送數(shù)據(jù),當(dāng)然,我們有各種各樣的Handler實(shí)現(xiàn)來(lái)處理它,那么ChannelInitializer便是用來(lái)配置這些Handler,它會(huì)提供一個(gè)ChannelPipeline,并把Handler加入到ChannelPipeline。
4.ChannelPipeline 一個(gè)Netty應(yīng)用基于ChannelPipeline機(jī)制,這種機(jī)制需要依賴(lài)于EventLoop和EventLoopGroup,因?yàn)樗鼈內(nèi)齻€(gè)都和事件或者事件處理相關(guān)。
EventLoops的目的是為Channel處理IO操作,一個(gè)EventLoop可以為多個(gè)Channel服務(wù)。
EventLoopGroup會(huì)包含多個(gè)EventLoop。
5.Channel 代表了一個(gè)Socket鏈接,或者其它和IO操作相關(guān)的組件,它和EventLoop一起用來(lái)參與IO處理。
6.Future 在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過(guò)一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽(tīng),具體的實(shí)現(xiàn)就是通過(guò)Future和ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽(tīng),當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽(tīng)會(huì)自動(dòng)觸發(fā)。
示例通過(guò)一個(gè)簡(jiǎn)單的示例,首先了解怎么基于netty開(kāi)發(fā)一個(gè)通信程序,包括服務(wù)的與客戶(hù)端:
Server:
@Slf4jpublic class Server { private EventLoopGroup boosGroup; private EventLoopGroup workGroup; public Server(int port){ try { init(port); log.info("----- 服務(wù)啟動(dòng)成功 -----"); } catch (InterruptedException e) { log.error("啟動(dòng)服務(wù)出錯(cuò):{}", e.getCause()); } } private void init(int port) throws InterruptedException { // 處理連接 this.boosGroup = new NioEventLoopGroup(); // 處理業(yè)務(wù) this.workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); // 綁定 bootstrap.group(boosGroup, workGroup) .channel(NioServerSocketChannel.class) //配置服務(wù)端 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_RCVBUF, 1024) .childOption(ChannelOption.SO_SNDBUF, 1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ServerHandler()); } }); ChannelFuture channelFuture = bootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } public void close(){ this.boosGroup.shutdownGracefully(); this.workGroup.shutdownGracefully(); }}@Slf4jclass ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>> server active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //1. 讀取客戶(hù)端的數(shù)據(jù)(緩存中去取并打印到控制臺(tái)) ByteBuf buf = (ByteBuf) msg; byte[] request = new byte[buf.readableBytes()]; buf.readBytes(request); String requestBody = new String(request, "utf-8"); log.info(">>>>>>>>> receive message: {}", requestBody); //2. 返回響應(yīng)數(shù)據(jù) ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes())); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); }} Client:
@Slf4jpublic class Client { private EventLoopGroup workGroup; private ChannelFuture channelFuture; public Client(int port){ init(port); } private void init(int port){ this.workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_RCVBUF, 1024) .option(ChannelOption.SO_SNDBUF, 1024) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ClientHandler()); } }); this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly(); } /** * * @param message */ public void send(String message){ this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes())); } /** * */ public void close(){ try { channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } workGroup.shutdownGracefully(); }}@Slf4jclass ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>> client active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); log.info(">>>>>>>>> receive message: {}", body); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); }} 測(cè)試:
public class StarterTests { static int port = 9011; @Test public void startServer(){ Server server = new Server(9011); } @Test public void startClient(){ Client client = new Client(port); client.send("Hello Netty!"); while (true){} }}生態(tài)DubboSpring Reactive類(lèi)似技術(shù)Mina、Netty、Grizzly
其他Proactor非阻塞異步網(wǎng)絡(luò)模型
參考https://mp.weixin.qq.com/s?__biz=MzUxNDA1NDI3OA==&mid=2247492766&idx=2&sn=b5df49147561e467fa5677b5bb09dacb&chksm=f9496577ce3eec61383994499d96a7f2b091b5eb8ee1ac47ad021f78072ae710f41d38257406&scene=27
https://blog.csdn.net/a745233700/article/details/122660246
熱門(mén)
聯(lián)系我們:435 226 40 @qq.com
版權(quán)所有 重播新聞網(wǎng) www.bluestd.cn 京ICP備2022022245號(hào)-17