开编我想说 刻意练习,本着课本的例子,照着我也写一遍的原则进行练习。
基础知识真的太重要,很多基础知识是会影响我们阅读书的效果,甚至可能会误解书本的原意。就拿着当前阅读的书来说起,如果我们不知道计算机操作系统基础,不知Java网络编程基础,不知网络协议等,那么我们看书可能会举步维艰。所以,在看本书之前,我尝试查阅一些相关资料,以补充能够更好吸收书本知识。
本文章,就是书本很多地方的内容,并未能深刻理解,一本书的内容也不可能全部呈现。例如,零拷贝,各种网络协议的理解,例如tcp,udp协议等。很多基础内容,都感觉相对薄弱,所以日后需要加强基础的部分。
阅读前的预习,大有裨益 同步,是一个可靠的有序操作,例如,有顺序执行操作A->操作B,如果操作A没有完成返回,操作B需要排队等候;反之,异步则相反无需等待,通常可以依靠回调或者事件的方式来进行操作的次序的问题。
在进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续,比如 ServerSocket 新连接建立完毕,或数据读取、写入操作完成;而非阻塞则是不管 IO 操作是否结束,直接返回,相应操作在后台继续处理。
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。摘抄自这里 
Linux 操作系统中select、poll、epoll详细内容可以查看  High-Performance I/O Design Patterns详细内容可以看  概述 Netty is an asynchronous event-driven network application framework
从上面我抽取了三个关键词,asynchronous、event-driven、high performance。带着三个关键信息,看能够从书中摄取到三个核心的内容。
核心内容 核心组件 Channel Channel,是出入站数据的载体,或者是网络中Socket抽象代表。
Channel的生命周期:register->active->inactive->unregistred
EventLoop 用于处理连接的生命周期的中发生的事件
Channel、EventLoop和EventLoopGroup的关系图
图中可以看到EventLoopGroup其实就具有多个EventLoop的组,EventLoop会在Channel的整个生命周期处理I/O事件。
一个EventLoopGroup包含一个或者多个EventLoop 一个EventLoop一个生命周期中,只和一个线程绑定 EventLoop所有I/O处理事件,将有专用的线程处理 一个Channel生命周期,只会注册到一个EventLoop上 一个EventLoop可以会分配多个Channel 得益于EventLoop是一个固定的线程处理,给定的Channel上的I/O的处理将会在同一个线程处理,避免了不必要的线程切换上下文的开销;
下面来深入了解一下,EventLoop:
java.util.concurrent io.netty.utilconcurrent io.netty.channel一个EventLoop一个生命周期中,只和一个线程绑定 EventLoop的执行逻辑:
ChannelPipeline pipeline,意译为管道,ChannelPipeline,一看到这个名字,我们能够猜到它的作用就是类似管道的作用。
本节,需要了解pipeline它的头部和尾部的概念,入站从头部第一个ChannelHandler先入,出站的时候从尾端端第一个ChannelHandler先开始流出。
顺便提一下,Channel一旦分配为ChannelPipeline后,是永久性操作,不能被修改。
AbstractChannelprotected  AbstractChannel (Channel parent)  {
1 2 3 4 5 DefaultChannelPipelineprotected  DefaultChannelPipeline (Channel channel)  {this .channel = ObjectUtil.checkNotNull(channel, "channel" );
最后,上面的设计使得,我们 变动(增删改)  ChannelPipeline上的Handler也是相当方便的。
ChannelHandler ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
ChannelInboundHandler 入站处理器接口,处理入站数据和状态变化 类  型 描  述 channelRegistered 当Channel已经注册到它的EventLoop并且能够处理I/O时被调用 channelUnregistered 当Channel从它的EventLoop注销并且无法处理任何I/O时被调用 channelActive 当Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪 channelInactive 当Channel离开活动状态并且不再连接它的远程节点时被调用 channelReadComplete 当Channel上的一个读操作完成时被调用 channelRead 当从Channel读取数据时被调用 userEventTriggered 当ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个POJO被传经了ChannelPipeline 
我们可以通过继承 ChannelInboundHandlerAdapter 来编写自己的入站处理器。常用的是:SimpleChannelInboundHandler,因为它给我优化了一些常用的操作,例如,资源的自动释放等
异常处理:
ChannelOutboundHandler 出站处理器接口,处理出站的所有数据,并且能够拦截所有的操作。 类  型 描  述 bind(ChannelHandlerContext,SocketAddress,ChannelPromise) 当请求将Channel绑定到本地地址时被调用 connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)当 请求将Channel连接到远程节点时被调用 disconnect(ChannelHandlerContext,ChannelPromise) 当请求将Channel从远程节点断开时被调用 close(ChannelHandlerContext,ChannelPromise) 当请求关闭Channel时被调用 deregister(ChannelHandlerContext,ChannelPromise) 当请求将Channel从它的EventLoop注销时被调用 read(ChannelHandlerContext) 当请求从Channel读取更多的数据时被调用 flush(ChannelHandlerContext) 当请求通过Channel将入队数据冲刷到远程节点时被调用 write(ChannelHandlerContext,Object,ChannelPromise) 当请求通过Channel将数据写到远程节点时被调用 
异常处理:
在出站操作都会返回ChannelFuture,进行添加监听事件 在ChannelOutboundHandler的入参都会带有ChannelPromis,进行添加监听事件 ChannelHandlerContext 紧接上面,我来看一下Channel、ChannelPipeline、ChannelHandler和ChannelHandlerContext之间的关系:
ChannelHandler高级用法:我们可以在使用ChannelHandler可以缓存ChannelHanlderContext,然后去完成一些复杂的操作。
ChannelFuture Netty提供了ChannelFuture接口,其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
Bootstrap 
引导客户端 和 无连接协议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public  final  class  EchoClient  {static  final  boolean  SSL  =  System.getProperty("ssl" ) != null ;static  final  String  HOST  =  System.getProperty("host" , "127.0.0.1" );static  final  int  PORT  =  Integer.parseInt(System.getProperty("port" , "8023" ));public  static  void  main (String[] args)  throws  Exception {final  SslContext sslCtx;if  (SSL) {else  {null ;EventLoopGroup  group  =  new  NioEventLoopGroup ();try  {Bootstrap  b  =  new  Bootstrap ();new  ChannelInitializer <SocketChannel>() {@Override public  void  initChannel (SocketChannel ch)  throws  Exception {ChannelPipeline  p  =  ch.pipeline();if  (sslCtx != null ) {new  CustomSimpleChannelInboundHandler ())ChannelFuture  f  =  b.connect(HOST, PORT).sync();finally  {
引导流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public  final  class  EchoServer  {static  final  boolean  SSL  =  System.getProperty("ssl" ) != null ;static  final  int  PORT  =  Integer.parseInt(System.getProperty("port" , "8007" ));public  static  final  String  SECOND_HANDLER_NAME  =  "second" ;public  static  void  main (String[] args)  throws  Exception {final  SslContext sslCtx;if  (SSL) {SelfSignedCertificate  ssc  =  new  SelfSignedCertificate ();else  {null ;EventLoopGroup  bossGroup  =  new  NioEventLoopGroup (1 );EventLoopGroup  workerGroup  =  new  NioEventLoopGroup (2 );final  EchoServerHandler  serverHandler  =  new  EchoServerHandler ();try  {ServerBootstrap  b  =  new  ServerBootstrap ();100 )new  LoggingHandler (LogLevel.INFO))new  ChannelInitializer <SocketChannel>() {@Override public  void  initChannel (SocketChannel ch)  throws  Exception {ChannelPipeline  p  =  ch.pipeline();if  (sslCtx != null ) {ChannelFuture  f  =  b.bind(PORT).sync();finally  {
我主要需要关闭我们创建EventLoopGroup,我们可以通过shutdownGracefully方法来优雅地关闭。
1 2 3 4 5 6 7 @Override public  Future<?> shutdownGracefully(long  quietPeriod, long  timeout, TimeUnit unit) {for  (EventExecutor l: children) {return  terminationFuture();
重新认识字节 The byte is a unit of digital information that most commonly consists of eight bits. Historically, the byte was the number of bits used to encode a single character of text in a computer and for this reason it is the smallest addressable unit of memory in many computer architectures
为啥说重新认识了字节,因为自己在学习ByteBuf的时候犯了一些低级的错误(单元测试将呈现我的低级错误),反应出来自己的基础还是不够牢固。
ByteBuff,实现原理有两个索引指针,一个用于读取(readerIndex),一个用于写入(writerIndex)。
下面的例子详细说,不同ByteBuff的使用模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 public  class  ByteBuffExample  {public  static  final  String  HI_BYTE_BUFF  =  "Hi! ByteBuff." ;private  static  final  ByteBuf  heapBuff  =  Unpooled.copiedBuffer(HI_BYTE_BUFF, CharsetUtil.UTF_8);public  static  final  Charset  UTF_8  =  CharsetUtil.UTF_8;public  static  void  main (String[] args)  {"==================heap buff==================" );"==================direct buff==================" );"==================composite buff==================" );"==================read and write==================" );"==================ByteBufHolder==================" );public  static  void  heapByteBuff ()  {if  (heapBuff.hasArray()) {byte [] array = heapBuff.array();int  offset  =  heapBuff.arrayOffset() + heapBuff.readerIndex();int  length  =  heapBuff.readableBytes();byte [] target = new  byte [length];0 , length);new  String (target));private  static  void  directByteBuff ()  {ByteBuf  directBuffer  =  Unpooled.directBuffer().writeBytes(heapBuff);if  (!directBuffer.hasArray()) {int  readerIndex  =  directBuffer.readerIndex();int  length  =  directBuffer.readableBytes();byte [] array = new  byte [length];new  String (array));private  static  void  compositeByteBuff ()  {CompositeByteBuf  compositeByteBuf  =  Unpooled.compositeBuffer();ByteBuf  headBuf  =  Unpooled.copiedBuffer("Hi! " , CharsetUtil.UTF_8);ByteBuf  bodyBuf  =  Unpooled.copiedBuffer("ByteBuff." , CharsetUtil.UTF_8);"compositeByteBuf.removeComponent(0) before:" );for  (ByteBuf byteBuf : compositeByteBuf) {"compositeByteBuf.removeComponent(0) after:" );0 );for  (ByteBuf byteBuf : compositeByteBuf) {private  static  void  readAndWrite ()  {ByteBuf  byteBuf  =  Unpooled.copiedBuffer(HI_BYTE_BUFF, CharsetUtil.UTF_8);ByteBuf  slice  =  byteBuf.slice(0 , 3 );ByteBuf  copy  =  byteBuf.copy(0 , 3 );assert  !ByteBufUtil.equals(slice, copy);0 , ((byte ) 'J' ));0 , ((byte ) 'W' ));assert  byteBuf.getByte(0 ) == slice.getByte(0 );assert  copy.getByte(0 ) != byteBuf.getByte(0 );char ) byteBuf.getByte(0 )));int  readerIndex  =  byteBuf.readerIndex();int  writerIndex  =  byteBuf.writerIndex();1 , ((byte ) 'B' ));char ) byteBuf.getByte(1 )));assert  readerIndex  = = byteBuf.readerIndex();assert  writerIndex  = = byteBuf.writerIndex();byte ) '?' ));assert  readerIndex  = = byteBuf.readerIndex();assert  writerIndex != byteBuf.writerIndex();private  static  void  byteBuffHolder ()  {ByteBufHolder  byteBufHolder  =  new  DefaultByteBufHolder (Unpooled.copiedBuffer(HI_BYTE_BUFF, UTF_8));ByteBuf  content  =  byteBufHolder.copy().content();0 , ((byte ) 'W' ));"source: "  + ((char ) heapBuff.getByte(0 )));"new: "  + ((char ) content.getByte(0 )));
解码器和编码器 解码器 总体来说,我们有两种需求:
字节解码成消息,常用抽象类:ByteToMessageDecoder extends ChannelInboundHandlerAdapter 消息A解码成消息B,常用抽象类:MessageToMessageDecoder extends ChannelInboundHandlerAdapter 编码器 总体来说,我们有两种需求:
消息编码成消息,常用抽象类:MessageToByteEncoder extends ChannelOutboundHandlerAdapter 消息B编码成消息A,常用抽象类:MessageToMessageEncoder extends ChannelOutboundHandlerAdapter 抽象的编解码类 很多时候,编解码是一对,我们就想着为啥不能直接设置成一个类?Netty给我们,预设Codec。
字节编解码成消息,ByteToMessageCodec extend ChannelDuplexHandler 消息编解码成消息,MessageToMessageCodec extend ChannelDuplexHandler HttpObjectAggregator 源码分析 1 2 HttpObjectAggregatorextends  MessageAggregator <HttpObject, HttpMessage, HttpContent, FullHttpMessage>
核心逻辑其实是在 MessageAggregator 中 decode 方法中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 @Override protected  void  decode (final  ChannelHandlerContext ctx, I msg, List<Object> out)  throws  Exception {assert  aggregating;if  (isStartMessage(msg)) {false ;if  (currentMessage != null ) {null ;throw  new  MessageAggregationException ();@SuppressWarnings("unchecked") S  m  =  (S) msg;Object  continueResponse  =  newContinueResponse(m, maxContentLength, ctx.pipeline());if  (continueResponse != null ) {ChannelFutureListener  listener  =  continueResponseWriteListener;if  (listener == null ) {new  ChannelFutureListener () {@Override public  void  operationComplete (ChannelFuture future)  throws  Exception {if  (!future.isSuccess()) {boolean  closeAfterWrite  =  closeAfterContinueResponse(continueResponse);final  ChannelFuture  future  =  ctx.writeAndFlush(continueResponse).addListener(listener);if  (closeAfterWrite) {return ;if  (handlingOversizedMessage) {return ;else  if  (isContentLengthInvalid(m, maxContentLength)) {return ;if  (m instanceof  DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {if  (m instanceof  ByteBufHolder) {else  {return ;CompositeByteBuf  content  =  ctx.alloc().compositeBuffer(maxCumulationBufferComponents);if  (m instanceof  ByteBufHolder) {else  if  (isContentMessage(msg)) {if  (currentMessage == null ) {return ;CompositeByteBuf  content  =  (CompositeByteBuf) currentMessage.content();@SuppressWarnings("unchecked") final  C  m  =  (C) msg;if  (content.readableBytes() > maxContentLength - m.content().readableBytes()) {@SuppressWarnings("unchecked") S  s  =  (S) currentMessage;return ;final  boolean  last;if  (m instanceof  DecoderResultProvider) {DecoderResult  decoderResult  =  ((DecoderResultProvider) m).decoderResult();if  (!decoderResult.isSuccess()) {if  (currentMessage instanceof  DecoderResultProvider) {true ;else  {else  {if  (last) {null ;else  {throw  new  MessageAggregationException ();
不可被忽略的单元测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  class  AbsIntegerEncoder  extends  MessageToMessageEncoder <ByteBuf> {@Override protected  void  encode (ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)  throws  Exception {while  (msg.readableBytes() >= 4 ) {int  value  =  Math.abs(msg.readInt());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  class  AbsIntegerEncoderTest  {@Test public  void  testAbsIntegerEncoder ()  {ByteBuf  byteBuf  =  Unpooled.buffer();for  (int  i  =  0 ; i < 10 ; i++) {1 );EmbeddedChannel  channel  =  new  EmbeddedChannel (new  AbsIntegerEncoder ());for  (int  i  =  0 ; i < 10 ; i++) {int ) channel.readOutbound()));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  class  FixedLengthFrameDecoder  extends  ByteToMessageDecoder  {private  final  int  frameLength;public  FixedLengthFrameDecoder (int  frameLength)  {if  (frameLength <= 0 ) {throw  new  IllegalArgumentException ("frameLength must be a positive integer: "  + frameLength);this .frameLength = frameLength;@Override protected  void  decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  throws  Exception {while  (in.readableBytes() >= frameLength) {ByteBuf  buf  =  in.readBytes(frameLength);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public  class  FrameChunkDecoderTest  {@Test public  void  testFrameChunkDecoderException ()  {ByteBuf  byteBuf  =  Unpooled.buffer();for  (int  i  =  0 ; i < 10 ; i++) {ByteBuf  input  =  byteBuf.duplicate();EmbeddedChannel  channel  =  new  EmbeddedChannel (new  FrameChunkDecoder (3 ));2 )));try  {4 ));catch  (Exception e) {instanceof  TooLongFrameException);3 )));ByteBuf  read  =  channel.readInbound();2 ), read);4 ).readSlice(3 ), read);
网络协议 使用WebSocket WebSocket ,WebSocket协议是完全重新设计的协议,旨在为Web上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输消息,因此,这也就要求它们异步地处理消息回执。
使用WebSocket实现一个简单的聊天室,总体架构图如下:
大概代码结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 src
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import  io.netty.channel.*;import  io.netty.handler.codec.http.*;import  io.netty.handler.ssl.SslHandler;import  io.netty.handler.stream.ChunkedNioFile;import  java.io.File;import  java.io.FileNotFoundException;import  java.io.RandomAccessFile;import  java.net.URISyntaxException;import  java.net.URL;public  class  HttpRequestHandler  extends  SimpleChannelInboundHandler <FullHttpRequest> {private  final  String wsUri;private  static  File INDEX;static  {URL  location  =  HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();try  {"index.html" ;"file:" ) ? path : path.substring(5 );new  File (path);if  (!INDEX.exists()) {throw  new  FileNotFoundException ("path :"  + path + " not found" );catch  (URISyntaxException | FileNotFoundException e) {public  HttpRequestHandler (String wsUri)  {this .wsUri = wsUri;@Override protected  void  channelRead0 (ChannelHandlerContext ctx, FullHttpRequest request)  throws  Exception {if  (wsUri.equalsIgnoreCase(request.uri())) {else  {if  (HttpUtil.is100ContinueExpected(request)) {RandomAccessFile  file  =  new  RandomAccessFile (INDEX, "r" );HttpResponse  response  =  new  DefaultHttpResponse (request.protocolVersion(), HttpResponseStatus.OK);"text/html; charset=UTF-8" );boolean  keepAlive  =  HttpUtil.isKeepAlive(request);if  (keepAlive) {if  (ctx.pipeline().get(SslHandler.class) == null ) {new  DefaultFileRegion (file.getChannel(), 0 , file.length()));else  {new  ChunkedNioFile (file.getChannel()));ChannelFuture  future  =  ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);if  (!keepAlive) {private  static  void  send100Continue (ChannelHandlerContext ctx)  {FullHttpResponse  response  =  new  DefaultFullHttpResponse (HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);@Override public  void  exceptionCaught (ChannelHandlerContext ctx, Throwable cause)  throws  Exception {
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import  io.netty.channel.ChannelHandlerContext;import  io.netty.channel.SimpleChannelInboundHandler;import  io.netty.channel.group.ChannelGroup;import  io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import  io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;public  class  TextWebSocketFrameHandler  extends  SimpleChannelInboundHandler <TextWebSocketFrame> {private  final  ChannelGroup group;public  TextWebSocketFrameHandler (ChannelGroup group)  {this .group = group;@Override public  void  userEventTriggered (ChannelHandlerContext ctx, Object evt)  throws  Exception {if  (evt instanceof  WebSocketServerProtocolHandler.HandshakeComplete) {new  TextWebSocketFrame ("Client "  + ctx.channel() + " joined" ));else  {super .userEventTriggered(ctx, evt);@Override protected  void  channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg)  throws  Exception {
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import  io.netty.channel.Channel;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.ChannelPipeline;import  io.netty.channel.group.ChannelGroup;import  io.netty.example.ws.handler.HttpRequestHandler;import  io.netty.example.ws.handler.TextWebSocketFrameHandler;import  io.netty.handler.codec.http.HttpObjectAggregator;import  io.netty.handler.codec.http.HttpServerCodec;import  io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import  io.netty.handler.logging.LogLevel;import  io.netty.handler.logging.LoggingHandler;import  io.netty.handler.stream.ChunkedWriteHandler;public  class  ChatServerInitializer  extends  ChannelInitializer <Channel> {private  final  ChannelGroup group;public  ChatServerInitializer (ChannelGroup group)  {this .group = group;@Override protected  void  initChannel (Channel ch)  throws  Exception {ChannelPipeline  pipeline  =  ch.pipeline();new  HttpServerCodec ())new  ChunkedWriteHandler ())new  HttpObjectAggregator (64  * 1024 ))new  HttpRequestHandler ("/ws" ))new  WebSocketServerProtocolHandler ("/ws" ))new  TextWebSocketFrameHandler (group))new  LoggingHandler (LogLevel.INFO))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import  io.netty.channel.Channel;import  io.netty.channel.group.ChannelGroup;import  io.netty.handler.ssl.SslContext;import  io.netty.handler.ssl.SslHandler;import  javax.net.ssl.SSLEngine;public  class  SecureChatServerInitializer  extends  ChatServerInitializer  {private  final  SslContext sslContext;public  SecureChatServerInitializer (ChannelGroup channelGroup, SslContext sslContext)  {super (channelGroup);this .sslContext = sslContext;@Override protected  void  initChannel (Channel ch)  throws  Exception {super .initChannel(ch);SSLEngine  engine  =  sslContext.newHandler(ch.alloc()).engine();false );new  SslHandler (engine));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import  io.netty.bootstrap.ServerBootstrap;import  io.netty.channel.Channel;import  io.netty.channel.ChannelFuture;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.EventLoopGroup;import  io.netty.channel.group.ChannelGroup;import  io.netty.channel.group.DefaultChannelGroup;import  io.netty.channel.nio.NioEventLoopGroup;import  io.netty.channel.socket.nio.NioServerSocketChannel;import  io.netty.example.ws.initializer.ChatServerInitializer;import  io.netty.util.concurrent.ImmediateEventExecutor;import  org.slf4j.Logger;import  org.slf4j.LoggerFactory;import  java.net.InetSocketAddress;import  java.util.Objects;public  class  ChatServer  {private  final  ChannelGroup  channelGroup  =  new  DefaultChannelGroup (ImmediateEventExecutor.INSTANCE);private  final  EventLoopGroup  bossGroup  =  new  NioEventLoopGroup (1 );private  final  EventLoopGroup  workGroup  =  new  NioEventLoopGroup (2 );private  Channel channel;protected  final  static  String  SERVER_PORT  =  System.getProperty("port" , "9999" );private  final  static  Logger  logger  =  LoggerFactory.getLogger(ChatServer.class);protected  ChannelFuture start ()  {int  port  =  Integer.parseInt(SERVER_PORT);ServerBootstrap  serverBootstrap  =  new  ServerBootstrap ();ChannelFuture  future  =  serverBootstrap.bind(new  InetSocketAddress (port));new  Thread (this ::destroy));if  (future.isSuccess()) {"Chat Server start, port: {}" , port);else  {"Chat Server start failed, port: {}" , port);return  future;protected  ChannelInitializer<Channel> createInitializer (ChannelGroup channelGroup)  {return  new  ChatServerInitializer (channelGroup);protected  void  destroy ()  {if  (channel != null ) {public  static  void  main (String[] args)  {new  ChatServer ().start();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import  io.netty.channel.Channel;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.group.ChannelGroup;import  io.netty.example.ws.initializer.SecureChatServerInitializer;import  io.netty.handler.ssl.SslContext;import  io.netty.handler.ssl.SslContextBuilder;import  io.netty.handler.ssl.util.SelfSignedCertificate;import  javax.net.ssl.SSLException;import  java.security.cert.CertificateException;public  class  SecureChatServer  extends  ChatServer  {private  final  SslContext sslContext;public  SecureChatServer (SslContext sslContext)  {this .sslContext = sslContext;@Override protected  ChannelInitializer<Channel> createInitializer (ChannelGroup channelGroup)  {return  new  SecureChatServerInitializer (channelGroup, sslContext);public  static  void  main (String[] args)  {try  {SelfSignedCertificate  certificate  =  new  SelfSignedCertificate ();SslContext  sslContext  =  SslContextBuilder.forServer(certificate.certificate(), certificate.privateKey()).build();new  SecureChatServer (sslContext).start();catch  (SSLException | CertificateException e) {
最后聊聊 连续差不多两周的学习,通过看书+练习的操作,让自己能够将书上的知识,真的运用起来,并且进一步加深理解。