-
Notifications
You must be signed in to change notification settings - Fork 159
/
Copy pathdrpyBatchFetch.js
127 lines (109 loc) · 4.28 KB
/
drpyBatchFetch.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import PQueue from 'p-queue';
import Queue from 'queue';
import fastq from 'fastq';
import axios from './axios.min.js';
export const batchFetch1 = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
let t1 = (new Date()).getTime();
const queue = new PQueue({concurrency: maxWorkers});
// 获取全局 timeout 设置
const timeout = timeoutConfig;
// 遍历 items 并生成任务队列
const promises = items.map((item) => {
return queue.add(async () => {
try {
const response = await axios(
Object.assign({}, item?.options, {
url: item.url,
method: item?.options?.method || 'GET',
timeout: item?.options?.timeout || timeout,
responseType: 'text',
}),
);
return response.data;
} catch (error) {
console.log(`[batchFetch][error] ${item.url}: ${error}`);
return null;
}
});
});
const results = await Promise.all(promises);
let t2 = (new Date()).getTime();
log(`PQueue 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
// 执行所有任务
return results
};
export const batchFetch2 = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
let t1 = (new Date()).getTime();
const queue = new Queue({concurrency: maxWorkers, autostart: true});
// 获取全局 timeout 设置
const timeout = timeoutConfig;
const results = [];
const promises = [];
items.forEach((item, index) => {
promises.push(
new Promise((resolve) => {
queue.push(async () => {
try {
const response = await axios(
Object.assign({}, item?.options, {
url: item.url,
method: item?.options?.method || 'GET',
timeout: item?.options?.timeout || timeout,
responseType: 'text',
}),
);
results[index] = response.data;
resolve();
} catch (error) {
console.log(`[batchFetch][error] ${item.url}: ${error}`);
results[index] = null;
resolve();
}
});
}),
);
});
await Promise.all(promises);
let t2 = (new Date()).getTime();
log(`Queue 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
return results;
};
export const batchFetch3 = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
let t1 = (new Date()).getTime();
// 获取全局 timeout 设置
const timeout = timeoutConfig;
// 创建任务处理函数
const worker = async (task, callback) => {
const {item, index, results} = task;
try {
const response = await axios(
Object.assign({}, item?.options, {
url: item.url,
method: item?.options?.method || 'GET',
timeout: item?.options?.timeout || timeout,
responseType: 'text',
}),
);
results[index] = response.data; // 保存结果
callback(null); // 通知任务成功完成
} catch (error) {
console.log(`[batchFetch][error] ${item.url}: ${error}`);
results[index] = null; // 记录错误
callback(null); // 即使出错,也调用回调,不中断任务队列
}
};
// 创建 fastq 队列
const results = [];
const queue = fastq(worker, maxWorkers);
// 将任务添加到队列,并捕获错误以确保继续执行
const tasks = items.map((item, index) => {
return new Promise((resolve) => {
queue.push({item, index, results}, () => resolve());
});
});
// 等待所有任务完成
await Promise.all(tasks);
let t2 = (new Date()).getTime();
log(`fastq 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
return results;
};