Skip to content

Commit 1a29ea2

Browse files
author
Taois
committed
feat: ws
1 parent b0ae381 commit 1a29ea2

File tree

5 files changed

+572
-5
lines changed

5 files changed

+572
-5
lines changed

controllers/index.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
* 统一管理和注册所有控制器路由
44
* 提供应用程序的所有API端点和功能模块
55
*/
6-
6+
import formBody from '@fastify/formbody';
7+
import websocket from '@fastify/websocket';
78
// 静态文件服务控制器
89
import staticController from './static.js';
910
// 文档服务控制器
@@ -44,6 +45,8 @@ import ftpProxyController from './ftp-proxy.js';
4445
import fileProxyController from './file-proxy.js';
4546
import m3u8ProxyController from './m3u8-proxy.js';
4647
import unifiedProxyController from './unified-proxy.js';
48+
// WebSocket控制器
49+
import websocketController from './websocket.js';
4750

4851
/**
4952
* 注册所有路由控制器
@@ -52,6 +55,10 @@ import unifiedProxyController from './unified-proxy.js';
5255
* @param {Object} options - 路由配置选项
5356
*/
5457
export const registerRoutes = (fastify, options) => {
58+
// 注册插件以支持 application/x-www-form-urlencoded
59+
fastify.register(formBody);
60+
// 注册WebSocket插件
61+
fastify.register(websocket);
5562
// 注册静态文件服务路由
5663
fastify.register(staticController, options);
5764
// 注册文档服务路由
@@ -93,4 +100,6 @@ export const registerRoutes = (fastify, options) => {
93100
fastify.register(m3u8ProxyController, options);
94101
// 注册统一代理路由
95102
fastify.register(unifiedProxyController, options);
103+
// 注册WebSocket路由
104+
fastify.register(websocketController, options);
96105
};

controllers/websocket.js

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/**
2+
* WebSocket控制器模块
3+
* 提供WebSocket连接管理、实时日志广播和客户端通信功能
4+
* @module websocket-controller
5+
*/
6+
7+
import {validateBasicAuth} from "../utils/api_validate.js";
8+
import {toBeijingTime} from "../utils/datetime-format.js";
9+
10+
// WebSocket 客户端管理
11+
const wsClients = new Set();
12+
13+
// 原始 console 方法备份
14+
const originalConsole = {
15+
log: console.log,
16+
error: console.error,
17+
warn: console.warn,
18+
info: console.info,
19+
debug: console.debug,
20+
time: console.time,
21+
timeEnd: console.timeEnd,
22+
};
23+
24+
// 广播消息到所有 WebSocket 客户端
25+
function broadcastToClients(message) {
26+
const deadClients = [];
27+
28+
wsClients.forEach(client => {
29+
try {
30+
// 使用WebSocket的OPEN常量进行状态检查
31+
if (client.readyState === client.OPEN) {
32+
client.send(message);
33+
} else {
34+
deadClients.push(client);
35+
}
36+
} catch (error) {
37+
originalConsole.error('Error broadcasting to client:', error);
38+
deadClients.push(client);
39+
}
40+
});
41+
42+
// 清理断开的连接
43+
deadClients.forEach(client => wsClients.delete(client));
44+
}
45+
46+
// Console 拦截器
47+
function interceptConsole() {
48+
const createInterceptor = (level, originalMethod) => {
49+
return function (...args) {
50+
// 调用原始方法
51+
originalMethod.apply(console, args);
52+
53+
// 广播到所有 WebSocket 客户端
54+
if (wsClients.size > 0) {
55+
const message = {
56+
type: 'console',
57+
level: level,
58+
timestamp: toBeijingTime(new Date()),
59+
content: args.map(arg =>
60+
typeof arg === 'object' ? JSON.stringify(arg, null, 2) : String(arg)
61+
).join(' ')
62+
};
63+
64+
broadcastToClients(JSON.stringify(message));
65+
}
66+
};
67+
};
68+
69+
console.log = createInterceptor('log', originalConsole.log);
70+
console.error = createInterceptor('error', originalConsole.error);
71+
console.warn = createInterceptor('warn', originalConsole.warn);
72+
console.info = createInterceptor('info', originalConsole.info);
73+
console.debug = createInterceptor('debug', originalConsole.debug);
74+
console.time = createInterceptor('time', originalConsole.time);
75+
console.timeEnd = createInterceptor('timeEnd', originalConsole.timeEnd);
76+
}
77+
78+
// 恢复原始 console
79+
function restoreConsole() {
80+
console.log = originalConsole.log;
81+
console.error = originalConsole.error;
82+
console.warn = originalConsole.warn;
83+
console.info = originalConsole.info;
84+
console.debug = originalConsole.debug;
85+
console.time = originalConsole.time;
86+
console.timeEnd = originalConsole.timeEnd;
87+
}
88+
89+
// 启动 console 拦截
90+
interceptConsole();
91+
92+
/**
93+
* WebSocket控制器插件
94+
* @param {Object} fastify - Fastify实例
95+
* @param {Object} options - 插件选项
96+
* @param {Function} done - 完成回调
97+
*/
98+
export default (fastify, options, done) => {
99+
// 注册WebSocket路由
100+
fastify.register(async function (fastify) {
101+
/**
102+
* WebSocket连接路由
103+
* GET /ws - 建立WebSocket连接
104+
*/
105+
fastify.get('/ws', {websocket: true}, (socket, req) => {
106+
const clientId = Date.now() + Math.random();
107+
originalConsole.log(`WebSocket client connected: ${clientId}`);
108+
originalConsole.log('Socket type:', typeof socket);
109+
originalConsole.log('Socket has send method:', typeof socket.send);
110+
111+
// 添加到客户端集合
112+
wsClients.add(socket);
113+
114+
// 设置连接属性
115+
socket.clientId = clientId;
116+
socket.isAlive = true;
117+
socket.lastPing = Date.now();
118+
119+
// 发送欢迎消息 - 先检查send方法是否存在
120+
if (typeof socket.send === 'function') {
121+
socket.send(JSON.stringify({
122+
type: 'welcome',
123+
message: 'WebSocket connection established',
124+
clientId: clientId,
125+
timestamp: Date.now()
126+
}));
127+
} else {
128+
originalConsole.error('Socket does not have send method');
129+
}
130+
131+
// 设置心跳检测
132+
const heartbeatInterval = setInterval(() => {
133+
if (!socket.isAlive) {
134+
originalConsole.log(`Client ${clientId} failed heartbeat, terminating`);
135+
clearInterval(heartbeatInterval);
136+
socket.terminate();
137+
return;
138+
}
139+
140+
socket.isAlive = false;
141+
socket.ping();
142+
}, 30000); // 30秒心跳检测
143+
144+
// 处理pong响应
145+
socket.on('pong', () => {
146+
socket.isAlive = true;
147+
});
148+
149+
// 处理消息
150+
socket.on('message', (message) => {
151+
try {
152+
const data = JSON.parse(message.toString());
153+
originalConsole.log(`Received from ${clientId}:`, data);
154+
155+
// 回显消息
156+
if (socket.readyState === socket.OPEN) {
157+
socket.send(JSON.stringify({
158+
type: 'echo',
159+
originalMessage: data,
160+
timestamp: Date.now(),
161+
clientId: clientId
162+
}));
163+
}
164+
} catch (error) {
165+
originalConsole.error('Error processing message:', error);
166+
if (socket.readyState === socket.OPEN) {
167+
socket.send(JSON.stringify({
168+
type: 'error',
169+
message: 'Invalid JSON format',
170+
timestamp: Date.now()
171+
}));
172+
}
173+
}
174+
});
175+
176+
// 处理连接关闭
177+
socket.on('close', (code, reason) => {
178+
originalConsole.log(`Client ${clientId} disconnected: ${code} ${reason}`);
179+
wsClients.delete(socket);
180+
clearInterval(heartbeatInterval);
181+
});
182+
183+
// 处理错误
184+
socket.on('error', (error) => {
185+
originalConsole.error(`WebSocket error for client ${clientId}:`, error);
186+
wsClients.delete(socket);
187+
clearInterval(heartbeatInterval);
188+
});
189+
});
190+
});
191+
192+
/**
193+
* WebSocket状态查询接口
194+
* GET /ws/status - 获取WebSocket服务状态
195+
*/
196+
fastify.get('/ws/status', {preHandler: validateBasicAuth}, async (request, reply) => {
197+
return {
198+
status: 'ok',
199+
timestamp: toBeijingTime(new Date()),
200+
clients: wsClients.size,
201+
console_intercepted: true
202+
};
203+
});
204+
205+
/**
206+
* 手动广播接口
207+
* POST /ws/broadcast - 向所有WebSocket客户端广播消息
208+
*/
209+
fastify.post('/ws/broadcast', {preHandler: validateBasicAuth}, async (request, reply) => {
210+
const {message} = request.body;
211+
if (!message) {
212+
return reply.code(400).send({error: 'Message is required'});
213+
}
214+
215+
const broadcastMessage = {
216+
type: 'broadcast',
217+
timestamp: toBeijingTime(new Date()),
218+
content: message
219+
};
220+
221+
broadcastToClients(JSON.stringify(broadcastMessage));
222+
223+
return {
224+
status: 'ok',
225+
timestamp: toBeijingTime(new Date()),
226+
message: 'Message broadcasted',
227+
clients: wsClients.size
228+
};
229+
});
230+
231+
done();
232+
};
233+
234+
// 导出工具函数供其他模块使用
235+
export {wsClients, broadcastToClients, originalConsole, interceptConsole, restoreConsole};

index.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import path from 'path';
33
import os from 'os';
44
import qs from 'qs';
55
import {fileURLToPath} from 'url';
6-
import formBody from '@fastify/formbody';
76
import {validateBasicAuth, validateJs, validatePwd, validatHtml} from "./utils/api_validate.js";
87
import {startAllPlugins} from "./utils/pluginManager.js";
98
// 注册自定义import钩子
@@ -38,9 +37,6 @@ const configDir = path.join(__dirname, 'config');
3837
const pluginProcs = startAllPlugins(__dirname);
3938
// console.log('pluginProcs:', pluginProcs);
4039

41-
// 注册插件以支持 application/x-www-form-urlencoded
42-
fastify.register(formBody);
43-
4440
// 添加钩子事件
4541
fastify.addHook('onReady', async () => {
4642
try {

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"dependencies": {
3131
"@fastify/formbody": "^7.4.0",
3232
"@fastify/static": "7.0.4",
33+
"@fastify/websocket": "^10.0.1",
3334
"axios": "^1.7.9",
3435
"basic-ftp": "^5.0.5",
3536
"cheerio": "^1.0.0",

0 commit comments

Comments
 (0)