您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

解密 SSE,像 ChatGPT 一样返回流式响应

时间:2023-11-20 14:31:10  来源:微信公众号  作者:古明地觉的编程教室

我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需要不断地轮询,所以最好的做法是服务端生成结果之后,主动推送给客户端。

比如 ChatGPT,它在生成内容时,也是生成一部分,就主动向客户端推送一部分。而在这个过程中,客户端不需要做任何事情,只需等待 ChatGPT 服务端返回内容即可。

说到这儿,你肯定想到了 WebSocket,没错这是一种解决方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的应用层传输协议,只不过在握手的时候搭了 HTTP 的便车,利用 HTTP 本身的协议升级特性,伪装成 HTTP,这样就能绕过浏览器沙箱、网络防火墙等限制。

当完成握手之后,后续传输的数据就不再是 HTTP 报文,而是 WebSocket 格式的二进制帧。所以这两者完全是不同的协议,那有没有一种办法,我们仍然使用 HTTP 协议,同时还能让服务端主动推送数据呢?

答案是有的,也就是本文将要介绍的 SSE 技术,它的英文全称是 Server-Sent Events(服务端推送事件)。通过 SSE 可以让服务端即时推送数据到客户端,而不需要客户端轮询服务端以获取更新。

解密 SSE,像 ChatGPT 一样返回流式响应图片

到这你可能会问,那 WebSocket 和 SSE 有什么区别呢?

1)通信方式

WebSocket 提供全双工通信,服务端和客户端都可以在同一个连接上同时发送和接收数据。最重要的是,WebSocket 独立于 HTTP 协议,尽管它开始于一个 HTTP 握手。

SSE 仅提供服务端到客户端的单向通信,客户端不能通过 SSE 给服务端发信息。

2)协议和实现

WebSocket 使用自己的协议(ws:// 或 wss://),需要服务端和客户端都支持,并且协议比较复杂。

SSE 则是使用标准的 HTTP 协议,实现起来更简单,尤其是在服务端。

3)适用场景

WebSocket 适用于服务端和客户端之间双向实时通信的场景,如在线游戏、聊天应用等。

SSE 适用于服务端向客户端单向推送数据的场景,如消息通知、数据更新。并且 SSE 自动支持断线重连,而 WebSocket 则需要额外部署。

4)复杂性和资源使用

WebSocket 由于其双向通信的能力,通常比 SSE 更复杂,可能需要更多的资源来维护和管理连接。

SSE 因为其单向性和基于 HTTP 的特性,它可以利用现有的网络基础设施,如代理服务器、负载均衡器和防火墙等等,通常更容易实现和维护。

 

相信现在你已经明白 SSE 是做什么的了,它的目的就是让服务端能够主动推送数据给客户端。如果不需要和服务端动态交互,只是希望服务端在有数据的时候推过来,那么 WebSocket 就有些太重了,因为这意味着要替换 HTTP 协议,而使用 SSE 无疑是更好的选择。

SSE 是什么我们已经知道了,那它是怎么实现的呢?原理是什么呢?

1)建立连接

客户端发起一个标准的 HTTP 请求来开启 SSE 会话,这个请求的特殊之处在于它包含一个头字段。

Accept: text/event-stream

相当于客户端告诉服务端,期望接收 SSE 消息流。而服务端在看到该字段时,也知道这是一个 SSE 请求,于是立即向客户端返回响应头,注意:返回的只有响应头,里面会包含如下头字段。

Content-Type: text/event-stream

响应头返回之后标志着 SSE 连接成功建立,并且连接会保持开放状态,服务端后续可以随时通过此连接向客户端发送数据。此外当连接不小心断开时,客户端也会自动进行重连。

所以在普通的 HTTP 请求中,一旦服务端返回,那么请求结束了。虽然可以将 Connection 头字段设置为 keep-alive 保证连接不断开,但每次访问都包含了 HTTP 请求/响应的完整过程。

而在 SSE 中,服务端会保持一个开放的连接,只要有新数据可用,就会直接发送给客户端。所以服务端会将响应以流的形式发送给客户端,每次发送的消息都是响应流的一部分,而不是独立的 HTTP 响应。

因此 SSE 的服务端在发送数据时,并不遵循传统的一次请求,一次响应模式。它在建立连接之后会保持连接开放,并通过这个持续的连接流式地发送数据,这种方式就使得 SSE 非常适合实时数据推送的场景。

2)发送消息

客户端发送请求,服务端返回响应头之后,SSE 连接就建立成功了。此时客户端只需要躺平,安静地等待服务端的输出即可。所以现在的关键就在于服务端要返回什么格式的数据呢?很简单,一个基本的消息由以下几部分组成:

  • data:实际的消息数据;
  • id:可选,消息的唯一标识符,用于在连接重新建立时同步消息;
  • event:可选,定义事件类型,用于客户端区分消息的类型;
  • retry:可选,自动重连的时间(毫秒),如果连接中断,客户端在自动重新连接之前,需要等待多长时间;

注意:每个消息要以两个换行符(nn)结束,举个例子,我们发送一个 Hello World。

data: Hello Worldnn

也可以发送带有事件类型的消息:

event: userUpdate
data: {"username": "Serpen", "age": 18}nn

还是比较简单的,服务端可以保持连接并随时发送更多数据。然后客户端在收到时会进行处理,但不需要(也不能)对服务端作出任何回应,它只需要被动地接收来自服务端的数据即可。当服务端认为数据已经全部发送完毕、无需再发时,那么便可以主动断开连接。

关于 SSE 的原理我们就解释清楚了,下面来实际编程实现它,这里我们先使用原生的 asyncio 实现 SSE。

import asyncio
from asyncio import StreamReader, StreamWriter

class SSE:

    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port

    @staticmethod
    def parse_request_headers(data: bytes) -> dict:
        """
        此函数负责从原始字节流中解析出请求头
        """
        headers = data.split(b"rnrn")[0].split(b"rn")
        header_dict = {}
        for header in headers[1:]:
            key, val = header.decode("utf-8").split(":", 1)
            header_dict[key.lower()] = val.strip()
        return header_dict

    async def handler_requests(self,
                               reader: StreamReader,
                               writer: StreamWriter):
        """
        负责处理来自客户端的请求
        每来一个客户端连接,就会基于此函数创建一个协程
        并且自动传递两个参数:reader 和 writer
        reader.read  负责读取数据,等价于 socket.recv
        writer.write 负责发送数据,等价于 socket.send
        """
        # 获取客户端的请求报文,这里对请求方法、请求地址不做限制
        data = awAIt reader.readuntil(b"rnrn")
        # 解析出请求头
        request_headers = self.parse_request_headers(data)
        # 简单检测一下 accept 字段,如果不是建立 SSE,那么直接关闭连接
        if request_headers.get("accept") != "text/event-stream":
            writer.close()
            return await writer.wait_closed()
        # 如果是 SSE 连接,那么返回响应头
        response_header = (
            b"HTTP/1.1 200 OKrn"
            b"Content-Type: text/event-streamrn"
            b"Cache-Control: no-cachern"
            b"Connection: keep-alivern"
            b'Access-Control-Allow-Origin: *rn'
            b"rn"
        )
        writer.write(response_header)
        await writer.drain()

        # 然后便可以不断地向客户端返回数据了
        for _ in range(5):
            # 每隔 1 秒返回数据
            data = "data: 高老师总能分享出好东西rnrn".encode("utf-8")
            writer.write(data)
            await writer.drain()
            await asyncio.sleep(1)
        # 数据传输完毕
        writer.close()
        await writer.wait_closed()

    async def __create_server(self):
        # 创建服务,第一个参数是一个回调函数
        # 当连接过来的时候就会根据此函数创建一个协程
        # 后面是绑定的 ip 和 端口
        server = await asyncio.start_server(self.handler_requests,
                                            self.host,
                                            self.port)
        # 然后开启无限循环
        async with server:
            await server.serve_forever()

    def run_server(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__create_server())

if __name__ == '__main__':
    sse = SSE()
    sse.run_server()

服务端代码编写完毕,下面编写前端代码。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <style>
        #data {
            font-weight: bold;
            color: cadetblue;
            font-size: large;
        }
    </style>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        document.addEventListener("DOMContentLoaded", function () {
            // 和服务端建立 SSE 连接
            var eventSource = new EventSource("http://localhost:9999");

            eventSource.onmessage = function (e) {
                // 将数据渲染在 <div id="data"></div> 的内部
                var data = e.data + "n";
                document.getElementById('data').innerText += data;
            };

            eventSource.onerror = function (e) {
                console.error('Error occurred:', e);
                eventSource.close();
            };
        });
    </script>
</body>
</html>

代码编写完毕,我们用浏览器打开 HTML 文件,便可看到如下效果。

解密 SSE,像 ChatGPT 一样返回流式响应

以上我们就简单实现了 SSE,当然为了加深印象,这里的后端是使用原生的 asyncio 编写的,但在工作中,我们会使用现成的 Web 框架,比如 FastAPI,Blacksheep 等等。

需要说明的是,虽然通过 SSE 技术可以实现类似 ChatGPT 的效果,但 ChatGPT 内部并没有用到 SSE,它内部是基于 HTTP 的分块传输实现的。因为 SSE 只能通过 GET 请求发出,并且无法自定义请求头。

如果想实现 ChatGPT 的效果,需要使用 HTTP 的分块传输。而像 FastAPI、BlackSheep 等框架提供的流式响应,便是基于 HTTP 的分块传输实现的,比如 FastAPI:

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

App = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西rnrn".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.get("/")
async def sse():
    return StreamingResponse(event_generator(), 
                             media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

首先之前的前端代码依旧可以正常访问,通过修改数据格式和 Content-Type 可以让其支持 SSE。但最正确的做法是直接访问 localhost:9999,效果如下:

解密 SSE,像 ChatGPT 一样返回流式响应图片

所以基于 StreamingResponse 可以实现 SSE,也可以直接访问。而直接访问的话,此时里面的 data: 和 rn 就是实体数据的一部分。并且这种方式和 ChatGPT 的工作机制是相似的,都使用了 HTTP 的分块传输,支持所有的请求方法,而 SSE 只支持 GET 请求。

BlackSheep 也是类似的,它同样也支持流式响应。

import asyncio
from blacksheep import Application, Response, StreamedContent
import uvicorn

app = Application()
app.use_cors(
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西rnrn".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.router.get("/")
async def sse():
    return Response(
        200,
        cnotallow=StreamedContent(b"text/event-stream", event_generator),
    )

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

可以测试一下,效果是一样的。如果你不想实现 SSE,只是希望固定的数据以流的形式一点一点返回,那么记得将数据中多余的 data: 和 rn 给去掉,并最好修改 Content-Type 为合适的类型。

所以 SSE 一般用于需要服务端推数据,但数据不知道什么时候会过来,于是通过 SSE 保持连接开放。后续当服务端有数据了,直接通过连接发送给客户端即可。

而 FastAPI 和 BlackSheep 提供的流式响应更像是,返回的数据比较庞大,如果全部准备好再一次性返回,会让用户陷入长时间的等待,造成不好的体验。于是通过分块传输,准备好一部分就返回一部分。虽然整体时间没变,但可以让用户立刻获取到数据,从而提升用户体验。

比如 ChatGPT,当它回答的内容比较多的时候,那么整个过程耗费几十秒钟是常有的事情,假设 30 秒。相比让用户等待 30 秒,然后内容一下子刷出来,显然生成一部分返回一部分这种方式更让人喜欢。

因此使用 SSE 还是流式响应,则取决于你当前的业务。如果你返回的数据是确定的,只是准备的时间比较长,或者数据量比较大,那么推荐使用流式响应。

至于 SSE,在这些现成的 Web 框架里面,也可以通过流式响应来实现,只需要将 Content-Type 设置为 text/event-stream,并将数据加上前缀 data: 和后缀 rnrn。

但说实话,如果想实现 SSE,不建议通过流式响应来实现,而是使用专门的库。以 FastAPI 为例:

from sse_starlette.sse import EventSourceResponse

FastAPI 其实就是在 starlette 的基础上套了一层壳,通过安装 sse_starlette 可以让 FastAPI 更好地支持 SSE。

以上就是本文的内容,如果对你有帮助,就点个赞吧。



Tags:ChatGPT   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
ChatGPT官宣免注册,全球互联网变天!OpenAI将取代谷歌搜索?
新智元报道编辑:编辑部【新智元导读】OpenAI这份愚人节礼物,实在是太大了:今天起,ChatGPT不用注册,可以直接使用。用户狂欢,竞品颤抖,我们仿佛已经听到,谷歌搜索引擎这位巨人轰然倒...【详细内容】
2024-04-02  Search: ChatGPT  点击:(7)  评论:(0)  加入收藏
无需注册!OpenAI宣布放开ChatGPT使用限制
工智能初创公司OpenAI宣布,即日起用户无须注册即可开始使用ChatGPT的功能。OpenAI在最新公告中写道:“让ChatGPT等工具广泛可用,让人们能够体验到人工智能的好处,这是我们使命的...【详细内容】
2024-04-02  Search: ChatGPT  点击:(6)  评论:(0)  加入收藏
ChatGPT 突然放开了账户限制,面向所有人开放
大门终于打开。奥特曼 OpenAI 的旗舰产品 ChatGPT 突然宣布:将面向所有人开放,无论你有没有注册账户。从今天开始,访问 Chat.openai.com 将不再要求用户登录,用户将直接进入与 C...【详细内容】
2024-04-02  Search: ChatGPT  点击:(3)  评论:(0)  加入收藏
今天起,ChatGPT无需注册就能用了!
 来源:量子位    金磊 克雷西 发自 凹非寺  就在刚刚,OpenAI狠狠地open了一把:从今天起,ChatGPT打开即用,无需再注册帐号和登录了!  像这样,直接登录网站,然后就可以开启对...【详细内容】
2024-04-02  Search: ChatGPT  点击:(7)  评论:(0)  加入收藏
ChatGPT之父Altman两小时对谈,首聊GPT-5何时发布、llya去哪里了、Q*究竟是什么
Altman做客油管博主Lex Fridman科技博客 ,被追问了一个又一个辛辣的问题。长达两个小时的对谈,奥特曼从OpenAI宫斗、马斯克诉讼、Sora,一直聊到AGI与外星文明!本文重点梳理了长...【详细内容】
2024-03-20  Search: ChatGPT  点击:(8)  评论:(0)  加入收藏
ChatGPT主管最新访谈:未来AI和人类如何共处?
Peter Deng在最新访谈中表示:AI不会取代生产力,人类和AI只有合作才能释放真正潜力,ChatGPT比已知的更强大、最大的挑战在于理解用户需求。当地时间3月13日,OpenAI 消费产品副总...【详细内容】
2024-03-19  Search: ChatGPT  点击:(11)  评论:(0)  加入收藏
ChatGPT日耗电超50万度,大模型或带来“电荒”
未来两年内将由“缺硅”变为“缺电”,马斯克的预言可能正在变成现实。据《纽约客》杂志报道,OpenAI的热门聊天机器人ChatGPT每天可能要消耗超过50万千瓦时的电力,以响应用户的...【详细内容】
2024-03-11  Search: ChatGPT  点击:(31)  评论:(0)  加入收藏
OpenAI新功能:ChatGPT可调用自定义机器人,对话更高效!
近日,知名科技公司OpenAI推出了一项新的功能&mdash;&mdash;“对话中调用(@)自定义聊天机器人”,让用户在对话中无缝切换不同领域的机器人。这一功能的出现,为用户带来了极大的便...【详细内容】
2024-02-02  Search: ChatGPT  点击:(53)  评论:(0)  加入收藏
ChatGPT元年之后,AI重塑世界,人类如何与其“智慧共生”?
过去一年,人工智能(AI)凭借大语言模型的爆火迅速进入大众视野。它比以往任何时候都更强大,也更具亲和力。这不仅给未来生活带来了新希望,也在人们心中蒙上了一层担忧&mdash;&mdas...【详细内容】
2024-01-26  Search: ChatGPT  点击:(77)  评论:(0)  加入收藏
年度最热AI应用TOP 50,除了ChatGPT还有这么多宝藏
量子位 | 公众号 QbitAI百模齐发、AI工具乱杀的一年里,谁是真正赢家?ChatGPT访问量遥遥领先位居第一,但单次使用时长没超过平均线。Midjourney访问量年度第四,但下滑量位居第二...【详细内容】
2024-01-02  Search: ChatGPT  点击:(51)  评论:(0)  加入收藏
▌简易百科推荐
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(5)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(12)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(8)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(10)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(8)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
Kubernetes 究竟有没有 LTS?
从一个有趣的问题引出很多人都在关注的 Kubernetes LTS 的问题。有趣的问题2019 年,一个名为 apiserver LoopbackClient Server cert expired after 1 year[1] 的 issue 中提...【详细内容】
2024-03-15  云原生散修  微信公众号  Tags:Kubernetes   点击:(5)  评论:(0)  加入收藏
站内最新
站内热门
站内头条