SSE长连接技术-EventSource
EventSource
简介
SSE(Server-Sent Events)
是一种基于 HTTP 协议,用于实现服务器主动向客户端推送数据的技术。它在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送,而客户端不能发送数据给服务端
特点
- 单向通信
- 实时推送
- 轻量级
- 支持跨域、使用简单、支持自动重连
适合场景
- 数据大屏
- 消息推送
- 股票交易
- 在线聊天
最致命缺点:无法携带自定义请求头Header
,只支持Get
请求;
简单使用
// 创建 EventSource 实例
const evtSource = new EventSource("http://localhost:3000/events");
// 监听消息
evtSource.onmessage = function (event) {
console.log("接收到的消息:", event);
};
// 监听连接打开
evtSource.onopen = function () {
console.log("连接已建立");
};
// 监听错误
evtSource.onerror = function (err) {
console.error("EventSource 发生错误:", err);
};
🔥SSE.JS
简介
sse.js
是 JavaScript 的灵活 EventSource 替代品(完全兼容的 EventSource polyfill),旨在使用服务器发送事件 (SSE) 流,比标准 EventSource 具有更多的控制和配置。
特点
- 支持自定义
Header
- 支持
POST
请求类型
GIthub:sse.js
食用文档
- 安装
npm install sse.js
- 用法
import { SSE } from "sse.js";
// 创建 SSE 实例
var source = new SSE("http://192.168.7.165:3000/events", {
headers: {
"X-App": "MyApp",
"X-Token": "my-custom-token",
},
});
// 监听消息
source.onmessage = function (event) {
console.log("接收到的消息:", event);
};
// 监听连接打开
source.onopen = function () {
console.log("SSE 连接已建立");
};
// 监听错误
source.onerror = function (err) {
console.error("EventSource 发生错误:", err);
};
FetchEventSource
简介
FetchEventSource
提供了更多的灵活性和定制能力,能够实现如自动重试、身份验证、自定义事件处理等复杂功能
特点
- 支持自定义
Header
GIthub:FetchEventSource
食用文档
- 安装
npm install @microsoft/fetch-event-source
- 用法
import { fetchEventSource } from "@microsoft/fetch-event-source";
fetchEventSource("http://192.168.7.165:3000/events", {
headers: {
"X-App": "MyApp",
"X-Token": "my-custom-token",
},
onmessage(event) {
console.log("接收到的消息:", event);
},
});
EventSourcePolyfill
简介
EventSourcePolyfill
是一个用于实现 Server-Sent Events (SSE) 的 polyfill 库;
提供了对不支持原生 EventSource 接口的浏览器的支持,使得开发者可以在所有现代浏览器中使用 SSE 技术。
特点
- 支持低版本浏览器
- 支持自定义
Header
GIthub:EventSourcePolyfill
食用文档
// 创建 EventSourcePolyfill 实例
const evtSource = new EventSourcePolyfill("http://192.168.7.165:3000/events", {
headers: {
"X-App": "MyApp",
"X-Token": "my-custom-token",
},
});
// 监听消息
evtSource.onmessage = function (event) {
console.log("接收到的消息:", event);
};
// 监听连接打开
evtSource.onopen = function () {
console.log("SSE 连接已建立");
};
// 监听错误
evtSource.onerror = function (err) {
console.error("EventSource 发生错误:", err);
};
ModernEventSource-自定义实现
实现
JavaScript
/**
* 默认事件解析器类
* @class
*/
class DefaultEventParser {
/**
* 解析事件字符串
* @param {string} eventString - 要解析的事件字符串
* @returns {string|null} 解析后的数据,如果没有有效数据则返回 null
*/
parse(eventString) {
const lines = eventString
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
return lines.length > 0 ? lines.join("\n") : null;
}
}
/**
* 自定义事件源类,用于处理服务器发送事件(SSE)
* @class
*/
class CustomEventSource {
/**
* 创建一个新的 CustomEventSource 实例
* @param {string} url - 服务器事件源的 URL
* @param {Object} [options={}] - 配置选项
* @param {Object} [options.headers={}] - 请求头
* @param {boolean} [options.autoReconnect=true] - 是否自动重连
* @param {number} [options.reconnectDelay=3000] - 重连延迟时间(毫秒)
* @param {number} [options.maxReconnectAttempts=5] - 最大重连尝试次数
* @param {boolean} [options.autoManageVisibility=true] - 是否自动管理页面可见性
* @param {EventParser} [eventParser] - 自定义事件解析器
*/
constructor(url, options = {}, eventParser) {
this.url = url;
this.headers = options.headers || {};
this.onmessage = null;
this.onerror = null;
this.onopen = null;
// 重连配置
this.autoReconnect = options.autoReconnect ?? true;
this.reconnectDelay = options.reconnectDelay || 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
// 页面可见性管理
this.autoManageVisibility = options.autoManageVisibility ?? true; // 默认开启可见性管理
this.isConnected = false;
this.wasConnected = false; // 记录页面隐藏前的连接状态
this.reconnectAttempts = 0;
this.abortController = null;
if (this.autoManageVisibility) {
this.handleVisibilityChange = this.handleVisibilityChange.bind(this);
document.addEventListener(
"visibilitychange",
this.handleVisibilityChange
);
}
if (document.visibilityState === "visible") {
this.connect();
}
this.eventParser = eventParser || new DefaultEventParser();
}
/**
* 处理页面可见性变化
* @private
*/
handleVisibilityChange() {
if (document.visibilityState === "visible") {
// 页面变为可见时,如果之前是连接状态,则重新连接
if (this.wasConnected) {
this.connect();
}
} else {
// 页面隐藏时,记录当前状态并断开连接
this.wasConnected = this.isConnected;
if (this.isConnected) {
this.close();
}
}
}
/**
* 连接到事件源
* @returns {Promise<void>}
*/
async connect() {
if (this.isConnected) {
return;
}
try {
this.abortController = new AbortController();
const response = await fetch(this.url, {
headers: this.headers,
signal: this.abortController.signal,
});
this.isConnected = true;
if (this.onopen) {
this.onopen();
}
this.reconnectAttempts = 0;
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const messages = buffer.split("\n\n");
buffer = messages.pop() || "";
messages.forEach((message) => {
const data = this.parseEvent(message);
if (data && this.onmessage) {
this.onmessage({ data });
}
});
}
} catch (error) {
if (error.name === "AbortError") {
console.log("连接被主动中断");
return;
}
this.handleError(error);
} finally {
this.isConnected = false;
}
}
/**
* 解析事件数据
* @private
* @param {string} eventString - 要解析的事件字符串
* @returns {string|null} 解析后的数据
*/
parseEvent(eventString) {
return this.eventParser.parse(eventString);
}
/**
* 尝试重新连接
* @private
* @returns {Promise<void>}
*/
async reconnect() {
if (!this.autoReconnect) {
console.log("自动重连已禁用");
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log("达到最大重连次数,停止重连");
this.close();
return;
}
this.reconnectAttempts++;
console.log(
`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`
);
this.close();
await new Promise((resolve) => setTimeout(resolve, this.reconnectDelay));
this.connect();
}
/**
* 处理错误
* @private
* @param {Error} error - 错误对象
*/
handleError(error) {
if (this.onerror) {
this.onerror(error);
}
this.reconnect();
}
/**
* 关闭连接
*/
close() {
this.isConnected = false;
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
}
/**
* 销毁实例,清理资源
*/
destroy() {
this.close();
if (this.autoManageVisibility) {
document.removeEventListener(
"visibilitychange",
this.handleVisibilityChange
);
}
}
}
export default CustomEventSource;
Typescript
// 自定义事件源配置选项接口
interface CustomEventSourceOptions {
headers?: Record<string, string>; // 添加 headers 选项
autoReconnect?: boolean; // 是否自动重连
reconnectDelay?: number; // 重连延迟时间(毫秒)
maxReconnectAttempts?: number; // 最大重连尝试次数
autoManageVisibility?: boolean; // 是否自动管理页面可见性
}
// 自定义事件对象接口
interface CustomEventSourceEvent {
data: string; // 事件数据
}
// 事件处理器类型定义
type EventHandler = (event: CustomEventSourceEvent) => void; // 消息处理器
type ErrorHandler = (error: Error) => void; // 错误处理器
type OpenHandler = () => void; // 连接打开处理器
// 事件解析器接口
interface EventParser {
parse(eventString: string): string | null; // 解析事件字符串
}
// 默认事件解析器实现
class DefaultEventParser implements EventParser {
parse(eventString: string): string | null {
// 将输入按行分割,去除空白字符,过滤空行,并重新组合
const lines = eventString
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
return lines.length > 0 ? lines.join("\n") : null;
}
}
class CustomEventSource {
private url: string;
private headers: Record<string, string>;
private autoReconnect: boolean;
private reconnectDelay: number;
private maxReconnectAttempts: number;
private autoManageVisibility: boolean;
private isConnected: boolean;
private wasConnected: boolean;
private reconnectAttempts: number;
private abortController: AbortController | null;
private eventParser: EventParser;
public onmessage: EventHandler | null;
public onerror: ErrorHandler | null;
public onopen: OpenHandler | null;
constructor(
url: string,
options: CustomEventSourceOptions = {},
eventParser?: EventParser
) {
this.url = url;
this.headers = options.headers || {};
this.onmessage = null;
this.onerror = null;
this.onopen = null;
// 重连配置
this.autoReconnect = options.autoReconnect ?? true;
this.reconnectDelay = options.reconnectDelay || 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
// 页面可见性管理
this.autoManageVisibility = options.autoManageVisibility ?? true;
this.isConnected = false;
this.wasConnected = false;
this.reconnectAttempts = 0;
this.abortController = null;
if (this.autoManageVisibility) {
this.handleVisibilityChange = this.handleVisibilityChange.bind(this);
document.addEventListener(
"visibilitychange",
this.handleVisibilityChange
);
}
if (document.visibilityState === "visible") {
this.connect();
}
this.eventParser = eventParser || new DefaultEventParser();
}
private handleVisibilityChange(): void {
if (document.visibilityState === "visible") {
if (this.wasConnected) {
this.connect();
}
} else {
this.wasConnected = this.isConnected;
if (this.isConnected) {
this.close();
}
}
}
public async connect(): Promise<void> {
if (this.isConnected) {
return;
}
try {
this.abortController = new AbortController();
const response = await fetch(this.url, {
headers: this.headers,
signal: this.abortController.signal,
});
this.isConnected = true;
if (this.onopen) {
this.onopen();
}
this.reconnectAttempts = 0;
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const messages = buffer.split("\n\n");
buffer = messages.pop() || "";
messages.forEach((message) => {
const data = this.parseEvent(message);
if (data && this.onmessage) {
this.onmessage({ data });
}
});
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
console.log("连接被主动中断");
return;
}
this.handleError(error as Error);
} finally {
this.isConnected = false;
}
}
private parseEvent(eventString: string): string | null {
return this.eventParser.parse(eventString);
}
private async reconnect(): Promise<void> {
if (!this.autoReconnect) {
console.log("自动重连已禁用");
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log("达到最大重连次数,停止重连");
this.close();
return;
}
this.reconnectAttempts++;
console.log(
`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`
);
this.close();
await new Promise((resolve) => setTimeout(resolve, this.reconnectDelay));
this.connect();
}
private handleError(error: Error): void {
if (this.onerror) {
this.onerror(error);
}
this.reconnect();
}
public close(): void {
this.isConnected = false;
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
}
public destroy(): void {
this.close();
if (this.autoManageVisibility) {
document.removeEventListener(
"visibilitychange",
this.handleVisibilityChange
);
}
}
}
export default CustomEventSource;
食用文档
import ModernEventSource from "./ModernEventSource.js";
const eventSource = new ModernEventSource("http://localhost:3000/events", {
autoReconnect: true, // 启用自动重连
reconnectDelay: 2000, // 2秒后重连
maxReconnectAttempts: 3, // 最多重连3次
autoManageVisibility: true, // 启用页面可见性管理
headers: {
"X-App": "MyApp",
"X-Token": "my-custom-token",
},
});
eventSource.onopen = function () {
console.log("连接已建立");
};
eventSource.onmessage = function (event) {
console.log("接收到的消息:", event);
};
eventSource.onerror = function (error) {
console.error("SSE 错误:", error);
};
// 在页面卸载时清理资源
window.addEventListener("unload", () => {
eventSource.destroy();
});