Skip to content

Commit 431ea1e

Browse files
committed
update:手写队列
1 parent 3e113b1 commit 431ea1e

File tree

6 files changed

+160
-28
lines changed

6 files changed

+160
-28
lines changed

Diff for: drop_code/batchFetch.js

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import PQueue from 'p-queue';
2+
import axios from "../libs_drpy/axios.min";
3+
4+
// p-queue 的实现,不怎么兼容海阔的旧版
5+
globalThis.batchFetch = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
6+
const queue = new PQueue({concurrency: maxWorkers});
7+
8+
// 获取全局 timeout 设置
9+
const timeout = timeoutConfig;
10+
11+
// 遍历 items 并生成任务队列
12+
const promises = items.map((item) => {
13+
return queue.add(async () => {
14+
try {
15+
const response = await axios(
16+
Object.assign({}, item?.options, {
17+
url: item.url,
18+
method: item?.options?.method || 'GET',
19+
timeout: item?.options?.timeout || timeout,
20+
responseType: 'text',
21+
}),
22+
);
23+
return response.data;
24+
} catch (error) {
25+
console.log(`[batchFetch][error] ${item.url}: ${error}`);
26+
return null;
27+
}
28+
});
29+
});
30+
31+
// 执行所有任务
32+
return Promise.all(promises);
33+
};

Diff for: js/PTT[优].js

+3
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ var rule = {
101101
lists.push(list1);
102102
// log(v_tab_urls);
103103
if (v_tab_urls.length > 1) {
104+
let t1 = (new Date()).getTime();
104105
let reqUrls = v_tab_urls.slice(1).map(it => {
105106
return {
106107
url: it,
@@ -119,6 +120,8 @@ var rule = {
119120
lists.push([]);
120121
}
121122
});
123+
let t2 = (new Date()).getTime();
124+
log(`批量请求二级 ${input} 耗时${t2 - t1}毫秒:`);
122125
}
123126
let playUrls = lists.map(it => it.join('#'));
124127
VOD.vod_play_url = playUrls.join('$$$');

Diff for: libs_drpy/drpyInject.js

+80-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
// import axios, {toFormData} from 'axios';
22
import axios, {toFormData} from './axios.min.js';
33

4-
import PQueue from 'p-queue';
4+
// import PQueue from 'p-queue';
5+
// import Queue from 'queue';
6+
import Queue from './queue.js';
57
import crypto from 'crypto';
68
import https from 'https';
79
import fs from 'node:fs';
@@ -138,15 +140,84 @@ async function request(url, opt = {}) {
138140
return {headers: {}, content: ''};
139141
}
140142

143+
// globalThis.batchFetch = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
144+
// const queue = new PQueue({concurrency: maxWorkers});
145+
//
146+
// // 获取全局 timeout 设置
147+
// const timeout = timeoutConfig;
148+
//
149+
// // 遍历 items 并生成任务队列
150+
// const promises = items.map((item) => {
151+
// return queue.add(async () => {
152+
// try {
153+
// const response = await axios(
154+
// Object.assign({}, item?.options, {
155+
// url: item.url,
156+
// method: item?.options?.method || 'GET',
157+
// timeout: item?.options?.timeout || timeout,
158+
// responseType: 'text',
159+
// }),
160+
// );
161+
// return response.data;
162+
// } catch (error) {
163+
// console.log(`[batchFetch][error] ${item.url}: ${error}`);
164+
// return null;
165+
// }
166+
// });
167+
// });
168+
//
169+
// // 执行所有任务
170+
// return Promise.all(promises);
171+
// };
172+
173+
// globalThis.batchFetch = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
174+
// const queue = new Queue({concurrency: maxWorkers, autostart: true});
175+
//
176+
// // 获取全局 timeout 设置
177+
// const timeout = timeoutConfig;
178+
//
179+
// const results = [];
180+
// const promises = [];
181+
//
182+
// items.forEach((item, index) => {
183+
// promises.push(
184+
// new Promise((resolve) => {
185+
// queue.push(async () => {
186+
// try {
187+
// const response = await axios(
188+
// Object.assign({}, item?.options, {
189+
// url: item.url,
190+
// method: item?.options?.method || 'GET',
191+
// timeout: item?.options?.timeout || timeout,
192+
// responseType: 'text',
193+
// }),
194+
// );
195+
// results[index] = response.data;
196+
// resolve();
197+
// } catch (error) {
198+
// console.log(`[batchFetch][error] ${item.url}: ${error}`);
199+
// results[index] = null;
200+
// resolve();
201+
// }
202+
// });
203+
// }),
204+
// );
205+
// });
206+
//
207+
// await Promise.all(promises);
208+
// return results;
209+
// };
210+
141211
globalThis.batchFetch = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
142-
const queue = new PQueue({concurrency: maxWorkers});
212+
const queue = new Queue(maxWorkers);
143213

144214
// 获取全局 timeout 设置
145215
const timeout = timeoutConfig;
146216

147-
// 遍历 items 并生成任务队列
148-
const promises = items.map((item) => {
149-
return queue.add(async () => {
217+
const results = [];
218+
219+
items.forEach((item, index) => {
220+
queue.add(async () => {
150221
try {
151222
const response = await axios(
152223
Object.assign({}, item?.options, {
@@ -156,16 +227,16 @@ globalThis.batchFetch = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
156227
responseType: 'text',
157228
}),
158229
);
159-
return response.data;
230+
results[index] = response.data;
160231
} catch (error) {
161232
console.log(`[batchFetch][error] ${item.url}: ${error}`);
162-
return null;
233+
results[index] = null;
163234
}
164235
});
165236
});
166237

167-
// 执行所有任务
168-
return Promise.all(promises);
238+
await queue.onIdle();
239+
return results;
169240
};
170241

171242
function base64EncodeBuf(buff, urlsafe = false) {

Diff for: libs_drpy/queue.js

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
class Queue {
2+
constructor(concurrency = 1) {
3+
this.concurrency = concurrency;
4+
this.queue = [];
5+
this.activeCount = 0;
6+
}
7+
8+
async runTask(task) {
9+
this.activeCount++;
10+
try {
11+
await task();
12+
} catch (err) {
13+
console.error('Task failed:', err);
14+
} finally {
15+
this.activeCount--;
16+
this.next();
17+
}
18+
}
19+
20+
next() {
21+
if (this.queue.length > 0 && this.activeCount < this.concurrency) {
22+
const nextTask = this.queue.shift();
23+
this.runTask(nextTask);
24+
}
25+
}
26+
27+
add(task) {
28+
this.queue.push(task);
29+
this.next();
30+
}
31+
32+
onIdle() {
33+
return new Promise((resolve) => {
34+
const interval = setInterval(() => {
35+
if (this.queue.length === 0 && this.activeCount === 0) {
36+
clearInterval(interval);
37+
resolve();
38+
}
39+
}, 10);
40+
});
41+
}
42+
}
43+
44+
export default Queue;

Diff for: package.json

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
"fastify": "^4.15.0",
1717
"lodash": "^4.17.21",
1818
"mime-types": "^2.1.35",
19-
"p-queue": "^8.0.1",
2019
"qs": "^6.13.1",
2120
"rotating-file-stream": "^3.2.5",
2221
"tunnel": "^0.0.6"

Diff for: yarn.lock

-18
Original file line numberDiff line numberDiff line change
@@ -353,11 +353,6 @@ escape-html@~1.0.3:
353353
resolved "https://registry.npmmirror.com/escape-html/-/escape-html-1.0.3.tgz#0258eae4d3d0c0974de1c169188ef0051d1d1988"
354354
integrity sha512-NiSupZ4OeuGwr68lGIeym/ksIZMJodUGOSCZ/FSnTxcrekbvqrgdUxlJOMpijaKZVjAJrWrGs/6Jy8OMuyj9ow==
355355

356-
eventemitter3@^5.0.1:
357-
version "5.0.1"
358-
resolved "https://registry.npmmirror.com/eventemitter3/-/eventemitter3-5.0.1.tgz#53f5ffd0a492ac800721bb42c66b841de96423c4"
359-
integrity sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==
360-
361356
fast-content-type-parse@^1.1.0:
362357
version "1.1.0"
363358
resolved "https://registry.npmmirror.com/fast-content-type-parse/-/fast-content-type-parse-1.1.0.tgz#4087162bf5af3294d4726ff29b334f72e3a1092c"
@@ -653,19 +648,6 @@ on-exit-leak-free@^2.1.0:
653648
resolved "https://registry.npmmirror.com/on-exit-leak-free/-/on-exit-leak-free-2.1.2.tgz#fed195c9ebddb7d9e4c3842f93f281ac8dadd3b8"
654649
integrity sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA==
655650

656-
p-queue@^8.0.1:
657-
version "8.0.1"
658-
resolved "https://registry.npmmirror.com/p-queue/-/p-queue-8.0.1.tgz#718b7f83836922ef213ddec263ff4223ce70bef8"
659-
integrity sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==
660-
dependencies:
661-
eventemitter3 "^5.0.1"
662-
p-timeout "^6.1.2"
663-
664-
p-timeout@^6.1.2:
665-
version "6.1.3"
666-
resolved "https://registry.npmmirror.com/p-timeout/-/p-timeout-6.1.3.tgz#9635160c4e10c7b4c3db45b7d5d26f911d9fd853"
667-
integrity sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==
668-
669651
package-json-from-dist@^1.0.0:
670652
version "1.0.1"
671653
resolved "https://registry.npmmirror.com/package-json-from-dist/-/package-json-from-dist-1.0.1.tgz#4f1471a010827a86f94cfd9b0727e36d267de505"

0 commit comments

Comments
 (0)