作者:vivo 互联网服务器团队- Jin KAI
本文从JAVA NIO网络编程的基础知识讲到了Tars框架使用NIO进行网络编程的源码分析。
一、Tars框架基本介绍
Tars是腾讯开源的支持多语言的高性能RPC框架,起源于腾讯内部2008年至今一直使用的统一应用框架TAF(Total Application Framework),目前支持C++、Java、php、Nodejs、Go语言。
该框架为用户提供了涉及到开发、运维、以及测试的一整套解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。它集可扩展协议编解码、高性能RPC通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体,通过它可以快速用微服务的方式构建自己的稳定可靠的分布式应用,并实现完整有效的服务治理。
官方仓库地址:
https://Github.com/TarsCloud/Tars
vivo推送平台也深度使用了该框架,部署服务节点超过一千个,经过线上每日一百多亿消息推送量的考验。
此前已在vivo互联网技术公众号发布过《 Tars Java 客户端源码分析 》此篇文章为续集。
Tars-java 最新稳定版1.7.2以及之前的版本都使用Java NIO进行网络编程;本文将分别详细介绍java NIO的原理和Tars 使用NIO进行网络编程的细节。
二、Java NIO原理介绍
从1.4版本开始,Java提供了一种新的IO处理方式:NIO (New IO 或 Non-blocking IO) 是一个可以替代标准Java IO 的API,它是面向缓冲区而不是字节流,它是非阻塞的,支持IO多路复用。
2.1 Channels (通道) and Buffers (缓冲区)
标准的IO基于字节流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作。数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,下图是一个完整流程。
Channel类型:
SocketChannel:
ServerSocketChannel:
通过 ServerSocketChannel.accept 方法监听新进来的连接,当accept方法返回的时候,它返回一个包含新进来的连接的SocketChannel,因此accept方法会一直阻塞到有新连接到达。
通常不会仅仅只监听一个连接,在while循环中调用 accept方法. 如下面的例子:
代码1:
while( true){ SocketChannel socketChannel = serverSocketChannel.accept; //do something with socketChannel... }ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept 方法会立刻返回,如果还没有新进来的连接,返回的将是null。因此,需要检查返回的SocketChannel是否是null。
代码2:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open; serverSocketChannel.socket.bind( new.NETSocketAddress( 8888)); serverSocketChannel.configureBlocking( false); while( true){ SocketChannel socketChannel = serverSocketChannel.accept; if(socketChannel != null){ //do something with socketChannel... } }Buffer类型:
Buffer的分配:
ByteBuffer buf = ByteBuffer.allocate(2048);
Buffer的读写:
一般是以下四个步骤:
mark 与 reset方法
通过调用Buffer.mark方法,可以标记Buffer中的一个特定position,之后可以通过调用Buffer.reset方法恢复到这个position。
duplicate
此方法返回承载先前字节缓冲区内容的新字节缓冲区。
remaining
limit 减去 position的值
2.2 Selector(选择器)
Java NIO引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程可以监听多个数据通道。要使用Selector,得向Selector注册Channel,然后调用它的select方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。
线程使用一个selector处理多个channel
代码3:
channel.configureBlocking( false); SelectionKey key = channel. register(selector,Selectionkey.OP_READ);注意register方法的第二个参数,这是一个监听的集合,即在通过Selector监听Channel时关注什么事件集合。
SelectionKey包含:
1) interest集合:selectionKey.interestOps 可以监听四种不同类型的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ
2) ready集合:selectionKey.readyOps; ready 集合是通道已经准备就绪的操作的集合,提供4个方便的方法:
3) Channel:selectionKey.channel;
4) Selector:selectionKey.selector;
5) 可选的附加对象:
提示:
OP_ACCEPT和OP_CONNECT的区别:简单来说,客户端建立连接是connect,服务器准备接收连接是accept。一个典型的客户端服务器网络交互流程如下图
selectedKeys
一旦调用了select方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys方法,访问已选择键集(selected key set)中的就绪通道。
wakeUp
某个线程调用select方法后阻塞了,即使没有通道已经就绪,也有办法让其从select方法返回。只要让其它线程在阻塞线程调用select方法的对象上调用Selector.wakeup方法即可。阻塞在select方法上的线程会立马返回。如果有其它线程调用了wakeup方法,但当前没有线程阻塞在select方法上,下个调用select方法的线程会立即wake up。
close
用完Selector后调用其close方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
通过Selector选择通道:
三、 Tars NIO网络编程
了解完 Java NIO的原理,我们来看看Tars是如何使用NIO进行网络编程的。
Tars的网络模型是多reactor多线程模型。有一点特殊的是tars的reactor线程组里随机选一个线程处理网络事件,并且该线程同时也能处理读写。
核心类之间的关系如下:
3.1 一个典型的Java NIO服务端开发流程
3.2 Tars客户端发起请求到服务器的流程
3.3 Tars服务端启动步骤
代码4:
publicvoidbind( AppService appService) throws IOException { // 此处略去非关键代码 if(endpoint.type. equals( "tcp")) { // 1 this.selectorManager = newSelectorManager(Utils.getSelectorPoolSize, newServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false); // 2 this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay); this.selectorManager.start; ServerSocketChannel serverChannel = ServerSocketChannel.open; serverChannel.socket.bind( newInetSocketAddress(endpoint.host, endpoint.port), 1024); // 3 serverChannel.configureBlocking( false); selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT); // 4 } elseif(endpoint.type. equals( "udp")) { this.selectorManager = newSelectorManager( 1, newServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true); this.selectorManager.start; // UDP开启的是DatagramChannel DatagramChannel serverChannel = DatagramChannel.open; DatagramSocket socket = serverChannel.socket; socket.bind( newInetSocketAddress(endpoint.host, endpoint.port)); serverChannel.configureBlocking( false); // UDP协议不需要建连,监听的是OP_READ就绪事件 this.selectorManager.getReactor( 0).registerChannel(serverChannel, SelectionKey.OP_READ); } }3.4 Reactor线程启动流程
代码5:
publicvoidrun( ) { while(!Thread.interrupted) { selector. select; // 1 processRegister; // 2 Iterator<SelectionKey> iter = selector.selectedKeys.iterator; // 3 while(iter.hasNext) { SelectionKey key = iter.next; iter. remove; if(!key.isValid) continue; try{ if(key.attachment != null&& key.attachment instanceof Session) { ((Session) key.attachment).updateLastOperationTime; //4 } dispatchEvent(key); // 5 } catch(Throwable ex) { disConnectWithException(key, ex); } } processUnRegister; // 6 } }3.5 IO事件分发处理
每个reactor线程都有一个专门的Accepter类去处理各种IO事件。TCPAccepter可以处理全部的四种事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter由于不需要建立连接所以只需要处理读和写两种事件。
1. 处理OP_ACCEPT
代码6:
publicvoidhandleAcceptEvent(SelectionKey key)throwsIOException { ServerSocketChannel server = (ServerSocketChannel) key.channel; // 1 SocketChannel channel = server.accept; channel.socket.setTcpNoDelay(selectorManager.isTcpNoDelay); channel.configureBlocking( false); Utils.setQosFlag(channel.socket); TCPSession session = newTCPSession(selectorManager); // 2 session.setChannel(channel); session.setStatus(SessionStatus.SERVER_CONNECTED); session.setKeepAlive(selectorManager.isKeepAlive); session.setTcpNoDelay(selectorManager.isTcpNoDelay); SessionManager.getSessionManager.registerSession(session); // 3 selectorManager.nextReactor.registerChannel(channel, SelectionKey.OP_READ, session); // 4 }2. 处理OP_CONNECT
代码7:
publicvoidhandleConnectEvent(SelectionKey key)throwsIOException { SocketChannel client = (SocketChannel) key.channel; // 1 TCPSession session = (TCPSession) key.attachment; //2 if(session == null) thrownewRuntimeException( "The session is null when connecting to ..."); try{ // 3 client.finishConnect; key.interestOps(SelectionKey.OP_READ); session.setStatus(SessionStatus.CLIENT_CONNECTED); } finally{ session.finishConnect; } }3.处理OP_WRITE、 处理OP_READ
调用session.read和session.doWrite 方法处理读写事件
代码8:
publicvoidhandleReadEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( "The session is null when reading data..."); session.read; } publicvoidhandleWriteEvent(SelectionKey key)throwsIOException { TCPSession session = (TCPSession) key.attachment; if(session == null) thrownewRuntimeException( "The session is null when writing data..."); session.doWrite; }3.6 seesion中网络读写的事件详细处理过程
1. 读事件处理
申请2k的ByteBuffer空间,读取channel中的数据到readBuffer中。根据sessionStatus判断是客户端读响应还是服务器读请求,分别进行处理。
代码9:
protectedvoid read throws IOException { int ret = readChannel; if( this.status == SessionStatus.CLIENT_CONNECTED) { readResponse; } elseif( this.status == SessionStatus.SERVER_CONNECTED) { readRequest; } else{ thrownew IllegalStateException( "The current session status is invalid. [status:"+ this.status + "]"); } if(ret < 0) { close; return; } } privateint readChannel throws IOException { int readBytes = 0, ret = 0; ByteBuffer data= ByteBuffer.allocate( 1024* 2); // 1 if(readBuffer == null) { readBuffer = IoBuffer.allocate(bufferSize); } // 2 while((ret = ((SocketChannel) channel).read( data)) > 0) { data.flip; // 3 readBytes += data.remaining; readBuffer.put( data.array, data.position, data.remaining); data.clear; } returnret < 0? ret : readBytes; }① 客户端读响应
从当前readBuffer中的内容复制到一个新的临时buffer中,并且切换到读模式,使用TarsCodec类解析出buffer内的协议字段到response,WorkThread线程通知Ticket处理response。如果response为空,则重置tempBuffer到mark的位置,重新解析协议。
代码10:
publicvoidreadResponse( ) { Response response = null; IoBuffer tempBuffer = null; tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { response = selectorManager.getProtocolFactory.getDecoder.decodeResponse(tempBuffer, this); } else{ response = null; } if(response != null) { if(response.getTicketNumber == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession.hashCode); selectorManager.getThreadPool.execute( newWorkThread(response, selectorManager)); } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }② 服务器读请求
任务放入线程池交给 WorkThread线程,最终交给Processor类出构建请求的响应体,包括分布式上下文,然后经过FilterChain的处理,最终通过jdk提供的反射方法invoke服务端本地的方法然后返回response。如果线程池抛出拒绝异常,则返回SERVEROVERLOAD = -9,服务端过载保护。如果request为空,则重置tempBuffer到mark的位置,重新解析协议。
代码11:
publicvoidreadRequest( ) { Request request = null; IoBuffer tempBuffer = readBuffer.duplicate.flip; while( true) { tempBuffer.mark; if(tempBuffer.remaining > 0) { request = selectorManager.getProtocolFactory.getDecoder.decodeRequest(tempBuffer, this); } else{ request = null; } if(request != null) { try{ request.resetBornTime; selectorManager.getThreadPool.execute( newWorkThread(request, selectorManager)); } catch(RejectedExecutionException e) { selectorManager.getProcessor.overload(request, request.getIOSession); } catch(Exception ex) { ex.printStackTrace; } } else{ tempBuffer.reset; readBuffer = resetIoBuffer(tempBuffer); break; } } }2. 写事件处理
同样也包括客户端写请求和服务端写响应两种,其实这两种都是往TCPSession中的LinkedBlockingQueue(有界队列最大8K)中插入ByteBuffer。LinkedBlockingQueue中的ByteBuffer最终会由TCPAcceptor中的handleWriteEvent监听写就绪事件并消费。
代码12:
protectedvoidwrite(IoBuffer buffer)throwsIOException { if(buffer == null) return; if(channel == null|| key == null) thrownewIOException( "Connection is closed"); if(! this.queue.offer(buffer.buf)) { thrownewIOException( "The session queue is full. [ queue size:"+ queue.size + " ]"); } if(key != null) { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); key.selector.wakeup; } }四、总结
本文主要介绍了Java NIO编程的基础知识 和 Tars-Java 1.7.2版本的网络编程模块的源码实现。
在最新的Tars-Java的master分支中我们可以发现网络编程已经由NIO改成了Netty,虽然Netty更加成熟稳定,但是作为学习者了解NIO的原理也是掌握网络编程的必经之路。
更多关于Tars框架的介绍可以访问:
https://tarscloud.org/
本文分析源码地址(v1.7.x分支):
https://github.com/TarsCloud/TarsJava
2023 源创会线下重启,基础软件技术面面谈。