您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

手写一个的在线聊天系统

时间:2022-07-06 14:35:32  来源:  作者:架构师jickly

一、目录介绍

  • 前置知识点
    • NIO
  • .NETty 的核心组件
    • Channel
    • Callback
    • Future 和 Promise
    • 事件和 ChannelHandler
  • Hello World

二、前置知识点

1、NIO

首先我们需要回顾一下,同步、异步、阻塞、非阻塞的相关概念。

  • 同步:调用 API 后,调用者能“马上”就知道操作的结果。
  • 异步:相对于同步,调用 API 后,调用者不能“马上”知道操作的结果,要等被调用方 回调 通知结果。
  • 阻塞:等待全部数据读取(写入)完成后,才返回。
  • 非阻塞:读取时,读多少返回多少;写入时,写入多少返回多少。不用等待,全部数据完成操作后,才返回。

NIO 是一种 同步非阻塞 的 I/O模型

  • 同步是指线程不断轮询 I/O 事件是否就绪。
  • 非阻塞是指线程在等待 I/O 的时候,可以同时做其他任务。

同步的核心是 选择器,选择器代替了线程本身轮询 I/O 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是 通道和缓冲区,当 I/O 事件就绪时,可以通过写到缓冲区,保证 I/O 的成功,而无需线程阻塞式地等待。

NIO主要有三大核心部分:

  • Channel(通道)
  • Buffer(缓冲区)
  • Selector(选择器)

传统 I/O 基于 字节流和字符流 进行操作,而 NIO 基于 Channel 和 Buffer 进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector 用于监听多个通道的事件(连接打开,数据到达等)。因此,单个线程可以监听多个数据通道,如下图所示:

手写一个的在线聊天系统

NIO

三、Netty 的核心组件

1、Channel

Channel 是一个通道,用于连接字节缓冲区 Buffer 和另一端的实体。在 NIO 网络编程模型中,服务端和客户端进行 I/O 数据交互(得到彼此推送的信息)的媒介就是 Channel。

Netty 对 JDK 原生的 ServerSocketChannel 进行了封装和增强。

Netty的Channel增加了如下的组件:

  • id 标识唯一身份信息
  • 可能存在的 parent Channel
  • 管道 pepiline
  • 用于数据读写的 unsafe 内部类
  • 事件循环执行器 NioEventLoop

Channel可以分成两类:

  • 服务端: NIOServerSocketChannel
  • 客户端: NioSocketChannel

具体依赖关系如下图所示:

服务端: NioServerSocketChannel

手写一个的在线聊天系统

NioServerSocketChannel

客户端: NioSocketChannel

手写一个的在线聊天系统

NioSocketChannel

2、Callback

callback 就是回调,一个方法可以在适当的时候回过头来调用这个 callback 方法。callback 是用于通知相关方某个操作已经完成最常用的方法之一。

Netty 在处理事件时内部使用了 callback。当一个 callback 被触发,事件可以被 ChannelHandler 的接口实现处理。

一个简单的例子如下所示:

public class ConnectHandler extends ChannelInboundHandlerAdapter {
    // 当一个新的连接建立时,channelActive 被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
           System.out.println(ctx.channel().remoteAddress());
    }
}

当一个新的连接建立后,ChannelHandler 的 callback 方法 channelActive() 会被调用,然后打印一条消息。

这个 ConnectHandler 实例(相当于被调用者)以参数的形式传入创建 Channel 连接的函数(调用者)中,之后这个函数创建新连接后,就会回来调用这个 ConnectHandler 的 channelActive 方法,这个过程就叫回调。

3、Future 和 Promise

Future 和 Promise 起源于函数式编程,目的是将值(Future)与其计算方式(Promise)分离,从而允许更灵活地进行计算,特别是通过并行化。

Future 表示目标计算的返回值,Promise 表示计算的方式,这个模型将返回结果和计算逻辑分离,目的是为了让计算逻辑不影响返回结果,从而抽象出一套异步编程模型。它们之间的纽带就是 Callback。

简单来说:Future 表示一个 异步任务的结果,针对这个结果可以添加 Callback 方法以便在任务 执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。

在 Netty 中:

  • Future 接口定义了 isSuccess(),isCancellable(),cause() 等方法,这些判断异步执行状态的方法都是只读的。
  • Promise 接口在 extends Future 的基础上增加了 setSuccess(),setFAIlure() 等方法,这些方法是可写的,即 Promise 是可写的 Future。

4、事件(event) 和 ChannelHandler

ChannelHandler

Netty 是一个事件驱动的框架,所有的 event(事件) 都是由 Handler 来进行处理。

ChannelHandler 可以处理 I/O、拦截 I/O 或者将 event 传递给 ChannelPipeline 中的下一个 Handler 进行处理。

ChannelHandler 的结构很简单,只有三个方法,分别是:

void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

event

Netty 用细分的 event(事件) 来通知我们状态的变化或者操作的状况。这让我们可以基于发的 event 来触发适当的行为。这类行为可能包括:

  • 日志记录
  • 数据传送
  • 流控制
  • 应用逻辑

event 按输入或者输出数据流的关系来分类。可能被输入数据或者相关状态改变触发的 event 包括:

  • 活跃或者停用的连接
  • 读数据
  • 用户 event
  • 错误 event

而输出 event 则是会触发将来行为的操作的结果,可能会是:

  • 打开或者关闭到远端的连接
  • 写或者刷数据到一个 socket

每一个 event 都可以被分派到一个用户实现的 handler 对象的方法。

Hello World

一个简单的 websocket 服务端,如下所示:

Server 代码:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;

public class Server {

    public static void main(String[] args) throws InterruptedException {
        // 用来接收客户端传进来的连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 用来处理已被接收的连接
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        // 创建 netty 服务
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workerGroup)
                    // 设置 NIO 模式
                    .channel(NioServerSocketChannel.class)
                    // 设置 tcp 缓冲区
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置发送缓冲区数据大小
                    .childOption(ChannelOption.SO_SNDBUF, 64 * 1024)
                    // 设置接收缓冲区数据大小
                    .option(ChannelOption.SO_RCVBUF, 64 * 1024)
                    // 保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // HttpClient编解码器
                            pipeline.addLast(new HttpServerCodec());
                            // 设置最大内容长度
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            // WebSocket 数据压缩扩展
                            pipeline.addLast(new WebSocketServerCompressionHandler());
                            // WebSocket 握手、控制帧处理
                            pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));
                            // 通道的初始化,数据传输过来进行拦截及执行
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            // 绑定端口启动服务
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

ServerHandler 代码:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道激活(回调)");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 仅处理 TextWebSocketFrame
        if (msg instanceof TextWebSocketFrame) {
            String request = ((TextWebSocketFrame) msg).text();
            System.out.println("收到请求:" + request);
            ctx.writeAndFlush(new TextWebSocketFrame("PONG"));
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("数据读取完成");
    }
}

pom 依赖

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.6.Final</version>
    </dependency>
</dependencies>

然后运行 Server 即可。

接下来我们来测试一下程序是否正常,这里使用到一个在线测试网站
http://www.easyswoole.com/wstool.html

连接上我们的服务,如下图所示:

手写一个的在线聊天系统

连接websocket

如果出现 OPENED => 127.0.0.1:8080 的提示,则表示连接成功。否则请排查是否程序和示例代码一致。

然后我们点击开始发送按钮,如果出现以下提示则表示,消息发送成功啦。

手写一个的在线聊天系统

发送消息1


手写一个的在线聊天系统

发送消息2

好了到这里,我们的 Hello World 已经完成了。



Tags:聊天系统   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
IM聊天系统安全手段之传输内容端到端加密技术
本文由融云技术团队分享,原题“互联网通信安全之端到端加密技术”,内容有较多修订和改动。 1、引言 在上篇《IM聊天系统安全手段之通信连接层加密技术》中,分享了关于通信连接...【详细内容】
2022-09-01  Search: 聊天系统  点击:(324)  评论:(0)  加入收藏
手写一个的在线聊天系统
一、目录介绍 前置知识点 NIO Netty 的核心组件 Channel Callback Future 和 Promise 事件和 ChannelHandler Hello World二、前置知识点1、NIO首先我们需要回顾一...【详细内容】
2022-07-06  Search: 聊天系统  点击:(430)  评论:(0)  加入收藏
浅谈聊天系统架构设计
由于多年前开发了一款聊天软件,今天朋友给我打电话,说他们公司准备开发一款内部使用的沟通交流工具,找我咨询关于即时聊天软件一些经验,于是跟他聊了一些关于这方面的东西,所以在...【详细内容】
2020-12-22  Search: 聊天系统  点击:(938)  评论:(0)  加入收藏
▌简易百科推荐
Meta如何将缓存一致性提高到99.99999999%
介绍缓存是一种强大的技术,广泛应用于计算机系统的各个方面,从硬件缓存到操作系统、网络浏览器,尤其是后端开发。对于Meta这样的公司来说,缓存尤为重要,因为它有助于减少延迟、扩...【详细内容】
2024-04-15    dbaplus社群  Tags:Meta   点击:(3)  评论:(0)  加入收藏
SELECT COUNT(*) 会造成全表扫描?回去等通知吧
前言SELECT COUNT(*)会不会导致全表扫描引起慢查询呢?SELECT COUNT(*) FROM SomeTable网上有一种说法,针对无 where_clause 的 COUNT(*),MySQL 是有优化的,优化器会选择成本最小...【详细内容】
2024-04-11  dbaplus社群    Tags:SELECT   点击:(3)  评论:(0)  加入收藏
10年架构师感悟:从问题出发,而非技术
这些感悟并非来自于具体的技术实现,而是关于我在架构设计和实施过程中所体会到的一些软性经验和领悟。我希望通过这些分享,能够激发大家对于架构设计和技术实践的思考,帮助大家...【详细内容】
2024-04-11  dbaplus社群    Tags:架构师   点击:(2)  评论:(0)  加入收藏
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(5)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(9)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(16)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(14)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(9)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(15)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(10)  评论:(0)  加入收藏
站内最新
站内热门
站内头条