异步迭代器
异步迭代器
异步迭代器是处理异步数据流的强大工具。本文将介绍异步迭代协议、for-await-of循环以及异步生成器函数,帮助您有效地处理异步序列。
异步迭代协议
异步迭代协议是 ES2018 引入的特性,它扩展了迭代协议以支持异步操作。与同步迭代器类似,异步迭代协议由两部分组成:异步可迭代协议和异步迭代器协议。
异步可迭代协议
一个对象要成为异步可迭代对象,必须实现 Symbol.asyncIterator
方法。该方法返回一个符合异步迭代器协议的对象。
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
current: 0,
max: 5,
async next() {
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, 1000));
if (this.current < this.max) {
return { value: this.current++, done: false };
} else {
return { done: true };
}
}
};
}
};
异步迭代器协议
异步迭代器是一个具有 next()
方法的对象,该方法返回一个 Promise,该 Promise 解析为具有 value
和 done
属性的对象。
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
// 使用异步迭代器
asyncIterator.next().then(result => {
console.log(result); // { value: 0, done: false }
});
for await...of 循环
for await...of
是一种语法结构,用于遍历异步可迭代对象。它必须在异步函数或异步生成器函数中使用。
基本语法
async function example() {
for await (const value of asyncIterable) {
console.log(value);
}
}
example();
// 每隔 1 秒输出: 0, 1, 2, 3, 4
与 Promise 数组一起使用
for await...of
也可以用于遍历 Promise 数组:
async function processPromises() {
const promises = [
Promise.resolve('第一个结果'),
new Promise(resolve => setTimeout(() => resolve('第二个结果'), 1000)),
new Promise(resolve => setTimeout(() => resolve('第三个结果'), 2000))
];
for await (const result of promises) {
console.log(result);
}
}
processPromises();
// 输出:
// 第一个结果
// 第二个结果 (等待 1 秒后)
// 第三个结果 (再等待 1 秒后)
错误处理
在 for await...of
循环中,可以使用 try/catch 来处理异步操作中的错误:
async function handleErrors() {
const promises = [
Promise.resolve('成功'),
Promise.reject(new Error('失败')),
Promise.resolve('另一个成功')
];
try {
for await (const result of promises) {
console.log('结果:', result);
}
} catch (error) {
console.error('捕获到错误:', error.message);
}
}
handleErrors();
// 输出:
// 结果: 成功
// 捕获到错误: 失败
异步生成器函数
异步生成器函数是生成器函数和异步函数的结合,使用 async function*
语法定义。它可以包含 await
表达式和 yield
语句。
基本语法
async function* asyncGenerator() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(3);
}
async function example() {
const gen = asyncGenerator();
console.log((await gen.next()).value); // 1
console.log((await gen.next()).value); // 2
console.log((await gen.next()).value); // 3
console.log((await gen.next()).done); // true
}
example();
使用 for await...of 遍历异步生成器
async function* createAsyncGenerator() {
for (let i = 0; i < 5; i++) {
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, 1000));
yield i;
}
}
async function consumeAsyncGenerator() {
for await (const value of createAsyncGenerator()) {
console.log(value);
}
}
consumeAsyncGenerator();
// 每隔 1 秒输出: 0, 1, 2, 3, 4
实际应用场景
1. 分页 API 请求
异步迭代器非常适合处理分页 API 请求,可以逐页获取数据并按需处理:
async function* fetchPaginatedData(url, pageSize = 10) {
let page = 1;
let hasMore = true;
while (hasMore) {
// 构建请求 URL
const requestUrl = `${url}?page=${page}&pageSize=${pageSize}`;
// 发送请求
const response = await fetch(requestUrl);
const data = await response.json();
// 提供当前页数据
yield data.items;
// 检查是否有更多页
hasMore = data.hasNextPage;
page++;
}
}
async function processAllData() {
const allItems = [];
try {
for await (const pageItems of fetchPaginatedData('https://api.example.com/data')) {
console.log(`获取到 ${pageItems.length} 条数据`);
// 处理当前页数据
for (const item of pageItems) {
// 处理每个项目
allItems.push(item);
}
}
console.log(`总共获取到 ${allItems.length} 条数据`);
} catch (error) {
console.error('获取数据时出错:', error);
}
}
processAllData();
2. 流式处理
异步迭代器可以用于处理流数据,例如文件流或网络流:
// 使用 Node.js 的 Readable 流
const fs = require('fs');
const { createReadStream } = fs;
async function processFileStream(filePath) {
const stream = createReadStream(filePath, { encoding: 'utf8', highWaterMark: 1024 });
let lineBuffer = '';
let lineCount = 0;
for await (const chunk of stream) {
// 处理每个数据块
lineBuffer += chunk;
// 按行分割
const lines = lineBuffer.split('\n');
// 保留最后一个可能不完整的行
lineBuffer = lines.pop();
// 处理完整的行
for (const line of lines) {
// 处理每一行
console.log(`处理第 ${++lineCount} 行: ${line.substring(0, 50)}...`);
}
}
// 处理最后一行(如果有)
if (lineBuffer.length > 0) {
console.log(`处理第 ${++lineCount} 行: ${lineBuffer.substring(0, 50)}...`);
}
console.log(`总共处理了 ${lineCount} 行`);
}
processFileStream('/path/to/large/file.txt');
3. 并发控制
异步迭代器可以用于控制并发操作的数量:
async function* batchProcess(items, batchSize = 3) {
// 将项目分成批次
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
// 并行处理批次中的所有项目
const results = await Promise.all(
batch.map(item => processItem(item))
);
// 提供批次结果
yield results;
}
}
// 模拟处理单个项目
async function processItem(item) {
// 模拟异步处理
await new Promise(resolve => setTimeout(resolve, 1000));
return `处理结果: ${item}`;
}
async function runBatchProcessing() {
const items = Array.from({ length: 10 }, (_, i) => `项目 ${i + 1}`);
console.log('开始批处理...');
for await (const batchResults of batchProcess(items)) {
console.log('批次结果:', batchResults);
}
console.log('批处理完成');
}
runBatchProcessing();
// 输出:
// 开始批处理...
// 批次结果: ['处理结果: 项目 1', '处理结果: 项目 2', '处理结果: 项目 3'] (等待 1 秒后)
// 批次结果: ['处理结果: 项目 4', '处理结果: 项目 5', '处理结果: 项目 6'] (再等待 1 秒后)
// 批次结果: ['处理结果: 项目 7', '处理结果: 项目 8', '处理结果: 项目 9'] (再等待 1 秒后)
// 批次结果: ['处理结果: 项目 10'] (再等待 1 秒后)
// 批处理完成
4. 事件流处理
异步迭代器可以用于处理事件流:
async function* createEventStream(element, eventName) {
const events = [];
let resolver = null;
// 创建事件监听器
const listener = event => {
events.push(event);
if (resolver) {
resolver();
resolver = null;
}
};
// 添加事件监听器
element.addEventListener(eventName, listener);
try {
while (true) {
// 如果没有事件,等待下一个事件
if (events.length === 0) {
await new Promise(resolve => {
resolver = resolve;
});
}
// 提供下一个事件
yield events.shift();
}
} finally {
// 清理:移除事件监听器
element.removeEventListener(eventName, listener);
}
}
async function handleClickEvents() {
const button = document.getElementById('myButton');
console.log('开始监听点击事件...');
// 只处理 5 个点击事件
let count = 0;
for await (const event of createEventStream(button, 'click')) {
console.log('点击位置:', event.clientX, event.clientY);
if (++count >= 5) {
break; // 处理 5 个事件后停止
}
}
console.log('停止监听点击事件');
}
// 当页面加载完成后开始处理
window.addEventListener('DOMContentLoaded', () => {
handleClickEvents();
});
高级技巧
1. 异步迭代器的组合
可以组合多个异步迭代器来创建更复杂的数据处理管道:
async function* map(asyncIterable, mapFn) {
for await (const item of asyncIterable) {
yield mapFn(item);
}
}
async function* filter(asyncIterable, filterFn) {
for await (const item of asyncIterable) {
if (filterFn(item)) {
yield item;
}
}
}
async function* take(asyncIterable, limit) {
let count = 0;
for await (const item of asyncIterable) {
if (count++ < limit) {
yield item;
} else {
break;
}
}
}
// 使用组合的异步迭代器
async function example() {
// 创建基础异步可迭代对象
async function* numbers() {
for (let i = 1; i <= 10; i++) {
await new Promise(resolve => setTimeout(resolve, 500));
yield i;
}
}
// 创建处理管道
const pipeline = take(
filter(
map(numbers(), n => n * 2),
n => n % 4 === 0
),
3
);
// 使用管道
for await (const item of pipeline) {
console.log(item); // 输出: 4, 8, 12
}
}
example();
2. 异步迭代器的取消
可以实现可取消的异步迭代器:
function createCancellableAsyncIterable(asyncIterable) {
let cancelled = false;
// 创建可取消的异步可迭代对象
const cancellableIterable = {
[Symbol.asyncIterator]() {
const iterator = asyncIterable[Symbol.asyncIterator]();
return {
async next() {
if (cancelled) {
return { done: true };
}
return iterator.next();
},
async return() {
cancelled = true;
if (iterator.return) {
return iterator.return();
}
return { done: true };
}
};
}
};
// 返回可迭代对象和取消函数
return {
iterable: cancellableIterable,
cancel() {
cancelled = true;
}
};
}
// 使用可取消的异步迭代器
async function example() {
// 创建基础异步可迭代对象
async function* counter() {
for (let i = 1; i <= 10; i++) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield i;
}
}
// 创建可取消的异步迭代器
const { iterable, cancel } = createCancellableAsyncIterable(counter());
// 设置一个定时器在 3.5 秒后取消迭代
setTimeout(() => {
console.log('取消迭代');
cancel();
}, 3500);
// 使用可取消的迭代器
try {
for await (const value of iterable) {
console.log('值:', value);
}
} catch (error) {
console.error('错误:', error);
}
console.log('迭代完成');
}
example();
// 输出:
// 值: 1 (等待 1 秒后)
// 值: 2 (再等待 1 秒后)
// 值: 3 (再等待 1 秒后)
// 取消迭代 (再等待 0.5 秒后)
// 迭代完成
3. 异步迭代器的超时处理
可以为异步迭代器添加超时功能:
async function* withTimeout(asyncIterable, timeoutMs) {
const iterator = asyncIterable[Symbol.asyncIterator]();
while (true) {
// 创建一个超时 Promise
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`操作超时 (${timeoutMs}ms)`));
}, timeoutMs);
});
try {
// 竞争:迭代器的下一个值 vs 超时
const result = await Promise.race([
iterator.next(),
timeoutPromise
]);
if (result.done) {
break;
}
yield result.value;
} catch (error) {
// 处理超时或其他错误
throw error;
}
}
}
// 使用带超时的异步迭代器
async function example() {
// 创建一个有时会很慢的异步生成器
async function* slowNumbers() {
for (let i = 1; i <= 5; i++) {
// 随机延迟 500-2500ms
const delay = Math.floor(Math.random() * 2000) + 500;
await new Promise(resolve => setTimeout(resolve, delay));
console.log(`生成数字 ${i} (延迟 ${delay}ms)`);
yield i;
}
}
try {
// 使用 1000ms 超时
for await (const num of withTimeout(slowNumbers(), 1000)) {
console.log('接收到数字:', num);
}
} catch (error) {
console.error('迭代中断:', error.message);
}
}
example();
异步迭代器与其他 JavaScript 特性的结合
1. 与 async/await 结合
异步迭代器与 async/await 结合使用可以创建更清晰的异步代码:
async function* fetchDataFromMultipleSources(urls) {
for (const url of urls) {
try {
const response = await fetch(url);
const data = await response.json();
yield data;
} catch (error) {
console.error(`获取 ${url} 时出错:`, error);
// 可以选择跳过错误的 URL 或者抛出错误
// throw error;
}
}
}
async function processMultipleSources() {
const urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3'
];
for await (const data of fetchDataFromMultipleSources(urls)) {
// 处理每个源的数据
console.log('处理数据:', data.title);
// 可以在这里执行更多异步操作
await saveToDatabase(data);
}
}
// 模拟保存到数据库
async function saveToDatabase(data) {
await new Promise(resolve => setTimeout(resolve, 500));
console.log(`已保存: ${data.title}`);
}
processMultipleSources();
2. 与 Promise.all 结合
可以结合 Promise.all
和异步迭代器来实现并行处理:
async function* processInParallel(asyncIterable, batchSize, processFn) {
const iterator = asyncIterable[Symbol.asyncIterator]();
let batch = [];
let result;
do {
// 收集一批项目
batch = [];
for (let i = 0; i < batchSize; i++) {
result = await iterator.next();
if (!result.done) {
batch.push(result.value);
} else {
break;
}
}
if (batch.length > 0) {
// 并行处理批次
const results = await Promise.all(batch.map(processFn));
yield results;
}
} while (batch.length === batchSize);
}
// 使用示例
async function example() {
// 创建一个异步数据源
async function* dataSource() {
for (let i = 1; i <= 10; i++) {
await new Promise(resolve => setTimeout(resolve, 300));
yield `项目 ${i}`;
}
}
// 处理函数
async function processItem(item) {
await new Promise(resolve => setTimeout(resolve, 500));
return `已处理: ${item}`;
}
// 使用并行处理
for await (const batchResults of processInParallel(dataSource(), 3, processItem)) {
console.log('批次结果:', batchResults);
}
}
example();
3. 与 Web API 结合
异步迭代器可以与各种 Web API 结合使用:
// 使用 Fetch API 和异步迭代器处理大型响应
async function* streamJsonResponse(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
// 处理缓冲区中剩余的数据
if (buffer.trim()) {
try {
yield JSON.parse(buffer);
} catch (e) {
console.error('解析最后的 JSON 片段时出错:', e);
}
}
break;
}
// 将新数据添加到缓冲区
buffer += decoder.decode(value, { stream: true });
// 尝试从缓冲区中提取完整的 JSON 对象
let startPos = 0;
let endPos;
while ((endPos = buffer.indexOf('\n', startPos)) !== -1) {
const jsonStr = buffer.substring(startPos, endPos).trim();
if (jsonStr) {
try {
yield JSON.parse(jsonStr);
} catch (e) {
console.error('解析 JSON 时出错:', e);
}
}
startPos = endPos + 1;
}
// 保留未处理的部分
buffer = buffer.substring(startPos);
}
}
// 使用示例
async function processStreamedData() {
try {
for await (const item of streamJsonResponse('https://api.example.com/stream')) {
console.log('处理项目:', item);
}
} catch (error) {
console.error('处理流数据时出错:', error);
}
}
processStreamedData();
最佳实践
1. 错误处理
在使用异步迭代器时,应该始终包含适当的错误处理:
async function* safeAsyncGenerator(asyncIterable) {
try {
for await (const item of asyncIterable) {
try {
yield item;
} catch (itemError) {
console.error('处理项目时出错:', itemError);
// 可以选择继续或重新抛出
}
}
} catch (error) {
console.error('迭代过程中出错:', error);
// 可以选择提供默认值或重新抛出
throw error;
} finally {
// 清理资源
console.log('迭代完成,清理资源');
}
}
2. 资源管理
确保在使用异步迭代器时正确管理资源:
async function* withResource(resourceFactory, resourceReleaser) {
const resource = await resourceFactory();
try {
yield resource;
} finally {
await resourceReleaser(resource);
}
}
// 使用示例
async function example() {
// 资源工厂和释放函数
const createConnection = async () => {
console.log('创建数据库连接');
return { id: Math.random(), isOpen: true };
};
const closeConnection = async (connection) => {
console.log(`关闭连接 ${connection.id}`);
connection.isOpen = false;
};
// 使用资源
for await (const connection of withResource(createConnection, closeConnection)) {
console.log(`使用连接 ${connection.id}`);
// 使用连接执行操作
await new Promise(resolve => setTimeout(resolve, 1000));
}
// 连接会在这里自动关闭
}
example();
3. 性能考虑
在使用异步迭代器时,要注意性能影响:
// 批量处理以提高性能
async function* batchItems(asyncIterable, batchSize = 100) {
let batch = [];
for await (const item of asyncIterable) {
batch.push(item);
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
}
// 提供最后一个可能不完整的批次
if (batch.length > 0) {
yield batch;
}
}
// 使用示例
async function processLargeDataset() {
// 假设这是一个大型数据源
async function* largeDataSource() {
for (let i = 0; i < 1000; i++) {
yield { id: i, data: `数据 ${i}` };
}
}
console.time('批量处理');
// 批量处理
for await (const batch of batchItems(largeDataSource(), 100)) {
console.log(`处理批次,包含 ${batch.length} 个项目`);
// 批量处理项目
}
console.timeEnd('批量处理');
}
processLargeDataset();
总结
异步迭代器是 JavaScript 中处理异步数据流的强大工具。它们结合了迭代器的惰性计算特性和 Promise 的异步处理能力,使得处理异步序列变得简单而优雅。
主要优点包括:
- 简化异步数据处理:提供了一种直观的方式来处理异步数据流
- 惰性计算:只在需要时才获取下一个值,节省资源
- 与语言特性集成:与
for await...of
循环和其他 JavaScript 特性无缝集成 - 错误处理:提供了清晰的错误处理机制
- 资源管理:支持适当的资源获取和释放模式
异步迭代器在处理分页 API 请求、流数据处理、事件流和并发控制等场景中特别有用。通过掌握异步迭代器,开发者可以编写更清晰、更高效的异步代码,更好地处理现代 Web 应用程序中的各种异步数据源。
在实际应用中,异步迭代器通常与 async/await、Promise 和其他 JavaScript 异步特性结合使用,创建强大而灵活的数据处理管道。随着 Web 应用程序变得越来越复杂,异步迭代器将成为处理异步数据流的重要工具。