支持 SSE 进行 POST 请求

发布于

EventSource 接口是 web 内容与服务器发送事件 (Server-Send Event) 通信的接口。它基于 HTTP 协议提供了一种简单、高效的方式,为浏览器中的网页提供实时更新功能。

基本使用像下面这样

const es = new EventSource('//api.example.com/ssedemo.php', {
  withCredentials: true,
})

es.onmessage = (event) => {
  console.log(event.data)
}

SSE 具有以下特点

  1. 单向通信: SSE 是服务器向客户端推送数据的单向通道
  2. 简单性和浏览器支持: SSE 使用简单的文本格式来传输事件,并在大多数现代浏览器中有内置支持。
  3. 持久连接: 使用 SSE 建立的连接是持久的。浏览器会自动保持与服务器的连接,并在中断后尝试重新连接
  4. 文本传输: 传输的数据是纯文本格式,事件流是文本类型,每个消息以一个或多个换行符隔开
  5. 自带重连机制: 如果连接因为任何原因断开,浏览器会自动重新连接

你或许已经注意到现在很多 AI 问答场景下使用的便是 SSE,但实际是在开启连接时客户端需要传一些参数到服务器,但像上面这样声明 EventSource, 传的仅有一个 url 其实默认就是 GET,所以可以将参数拼接到 url 上,不过参数比较多时就不太优雅了,毕竟 url 也有长度限制,那可不可以使用 POST 方式来开启 SSE 连接呢

这种情况下可以抛弃使用 EventSource,自己手动处理流式响应

import { Observable } from 'rxjs'

function fetchWithSSE(url: string, body: any) {
  return new Observable<string>((subscriber) => {
    fetch(url, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        Accept: 'text/event-stream', // 声明接受 SSE
      },
      body: JSON.stringify(body),
    })
      .then((response) => {
        if (!response.ok || !response.body) {
          subscriber.error(new Error(`SSE 连接失败: ${response.status}`))
        }

        const reader = response.body.getReader()
        const decoder = new TextDecoder()
        let buffer = ''

        const readChunk = () => {
          reader
            .read()
            .then(({ done, value }) => {
              if (done) {
                subscriber.complete()
                return
              }

              buffer += decoder.decode(value, { stream: true })

              const lines = buffer.split('\n')
              buffer = lines.pop() || '' // 保留未完成的行

              lines.forEach((line) => {
                if (line.startsWith('data: ')) {
                  const data = line.replace('data: ', '').trim()
                  if (data) {
                    subscriber.next(data)
                  }
                }
              })

              readChunk()
            })
            .catch((err) => {
              subscriber.error(err)
            })
        }

        readChunk()

        return reader.cancel
      })
      .catch(subscriber.error)
  })
}

或者直接使用三方库 @microsoft/fetch-event-source

import { fetchEventSource } from '@microsoft/fetch-event-source'

await fetchEventSource('/api/sse', {
  onmessage(ev) {
    console.log(ev.data)
  },
})