我将为您完善异步迭代器的文档内容,包括异步迭代协议、for-await-of循环、异步生成器函数以及实际应用场景。
---
title: 异步迭代器
icon: javascript
order: 4
---
# 异步迭代器
异步迭代器是处理异步数据流的强大工具。本文将介绍异步迭代协议、for-await-of循环以及异步生成器函数,并探讨它们在处理流数据、分页API等场景中的应用。
## 异步迭代协议
### 基本概念
异步迭代协议是ES2018引入的标准,用于处理异步产生的数据序列。它扩展了ES2015中的迭代协议,使其能够处理异步数据源。
异步迭代协议由两部分组成:
1. **异步可迭代对象(Async Iterable)**:实现了`Symbol.asyncIterator`方法的对象
2. **异步迭代器(Async Iterator)**:由`Symbol.asyncIterator`方法返回的对象,具有`next()`方法,该方法返回一个Promise,该Promise解析为具有`value`和`done`属性的对象
```javascript
// 异步可迭代对象的基本结构
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
// 异步迭代器
next() {
// 返回一个Promise,解析为 {value, done} 对象
return Promise.resolve({ value: 'some value', done: false });
}
};
}
};
与同步迭代器的区别
异步迭代器与同步迭代器的主要区别在于:
- 使用
Symbol.asyncIterator
而不是Symbol.iterator
next()
方法返回Promise而不是直接返回结果对象- 需要使用
for-await-of
循环而不是for-of
循环来消费
// 同步迭代器
const syncIterable = {
[Symbol.iterator]() {
return {
next() {
return { value: 'sync value', done: false };
}
};
}
};
// 异步迭代器
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
next() {
return Promise.resolve({ value: 'async value', done: false });
}
};
}
};
for-await-of循环
基本用法
for-await-of
循环是专门为异步迭代器设计的语法,它允许以同步代码的风格迭代异步数据源:
async function consumeAsyncIterable(asyncIterable) {
try {
for await (const value of asyncIterable) {
console.log(value);
}
} catch (error) {
console.error('迭代过程中出错:', error);
}
}
注意事项:
for-await-of
循环只能在异步函数或异步生成器函数中使用- 循环会等待每个Promise解析后再继续下一次迭代
- 可以捕获迭代过程中的错误
与Promise.all的比较
for-await-of
循环和Promise.all
都可以处理多个Promise,但它们的行为不同:
async function compareApproaches() {
const promises = [
new Promise(resolve => setTimeout(() => resolve('First'), 1000)),
new Promise(resolve => setTimeout(() => resolve('Second'), 500)),
new Promise(resolve => setTimeout(() => resolve('Third'), 100))
];
// 使用Promise.all - 并行执行所有Promise
console.time('Promise.all');
const resultsAll = await Promise.all(promises);
console.log('Promise.all结果:', resultsAll); // ['First', 'Second', 'Third']
console.timeEnd('Promise.all'); // 大约1000ms
// 使用for-await-of - 按顺序等待每个Promise
console.time('for-await-of');
const results = [];
for await (const result of promises) {
results.push(result);
}
console.log('for-await-of结果:', results); // ['First', 'Second', 'Third']
console.timeEnd('for-await-of'); // 大约1600ms
}
compareApproaches();
主要区别:
Promise.all
并行执行所有Promise,等待所有完成for-await-of
按顺序等待每个Promise完成后再处理下一个- 对于独立的Promise集合,
Promise.all
通常更高效 - 对于需要按顺序处理的异步数据流,
for-await-of
更合适
异步生成器函数
基本语法
异步生成器函数是生成器函数和异步函数的结合,使用async function*
语法定义:
async function* createAsyncGenerator() {
yield Promise.resolve(1);
yield Promise.resolve(2);
yield Promise.resolve(3);
}
async function consumeAsyncGenerator() {
const asyncGenerator = createAsyncGenerator();
for await (const value of asyncGenerator) {
console.log(value); // 1, 2, 3
}
}
consumeAsyncGenerator();
异步生成器函数的特点:
- 可以使用
yield
产生Promise或普通值 - 可以在函数内部使用
await
- 返回的对象实现了异步迭代器协议
实现延迟计算
异步生成器可以实现延迟计算和处理,只在需要时才执行异步操作:
async function* lazyFetch(urls) {
for (const url of urls) {
// 只有迭代到这个URL时才会执行fetch
const response = await fetch(url);
const data = await response.json();
yield data;
}
}
async function processUrls() {
const urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3'
];
const fetcher = lazyFetch(urls);
// 每次迭代才会执行一个fetch操作
for await (const data of fetcher) {
console.log('处理数据:', data);
// 可以在这里决定是否继续迭代
if (someCondition(data)) {
break; // 提前终止迭代
}
}
}
异步生成器方法
可以在类中定义异步生成器方法:
class DataSource {
constructor(api) {
this.api = api;
}
async *getItems(query) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await this.api.fetch(`/items?query=${query}&page=${page}`);
const { items, pagination } = await response.json();
// 为每个项目生成一个值
for (const item of items) {
yield item;
}
// 检查是否有更多页
hasMore = pagination.hasNextPage;
page++;
}
}
}
// 使用异步生成器方法
async function searchItems(query) {
const dataSource = new DataSource(api);
for await (const item of dataSource.getItems(query)) {
console.log('找到项目:', item);
}
}
实际应用场景
处理流数据
异步迭代器非常适合处理流数据,如文件流或网络流:
// 使用Node.js的Readable流
const fs = require('fs');
const { createReadStream } = fs;
async function processFileStream(filePath) {
const stream = createReadStream(filePath, { encoding: 'utf8', highWaterMark: 1024 });
// Node.js的Readable流实现了Symbol.asyncIterator
let lineBuffer = '';
let lineCount = 0;
for await (const chunk of stream) {
// 处理每个数据块
lineBuffer += chunk;
// 按行处理数据
const lines = lineBuffer.split('\n');
lineBuffer = lines.pop(); // 保留最后一个可能不完整的行
for (const line of lines) {
lineCount++;
console.log(`行 ${lineCount}: ${line.substring(0, 50)}...`);
}
}
// 处理最后一行
if (lineBuffer.length > 0) {
lineCount++;
console.log(`行 ${lineCount}: ${lineBuffer.substring(0, 50)}...`);
}
console.log(`总共处理了 ${lineCount} 行`);
}
processFileStream('large-file.txt');
分页API请求
异步生成器非常适合处理分页API请求:
async function* fetchPaginatedData(baseUrl, pageSize = 10) {
let page = 1;
let hasMore = true;
while (hasMore) {
const url = `${baseUrl}?page=${page}&pageSize=${pageSize}`;
const response = await fetch(url);
if (!response.ok) {
throw new Error(`API请求失败: ${response.status}`);
}
const data = await response.json();
// 生成当前页的数据
yield data.items;
// 检查是否有更多页
hasMore = data.hasNextPage;
page++;
}
}
async function getAllUsers() {
try {
const allUsers = [];
// 使用for-await-of迭代所有页
for await (const users of fetchPaginatedData('https://api.example.com/users')) {
console.log(`获取到${users.length}个用户`);
allUsers.push(...users);
}
console.log(`总共获取到${allUsers.length}个用户`);
return allUsers;
} catch (error) {
console.error('获取用户失败:', error);
throw error;
}
}
事件流处理
异步迭代器可以用于处理事件流:
// 将事件转换为异步迭代器
function eventToAsyncIterator(target, eventName) {
const events = [];
const listeners = [];
return {
[Symbol.asyncIterator]() {
// 设置事件监听器
const listener = (event) => {
// 如果有等待的Promise,解析它
if (listeners.length > 0) {
const resolve = listeners.shift();
resolve({ value: event, done: false });
} else {
// 否则将事件存储起来
events.push(event);
}
};
target.addEventListener(eventName, listener);
return {
next() {
// 如果有存储的事件,立即返回
if (events.length > 0) {
const event = events.shift();
return Promise.resolve({ value: event, done: false });
}
// 否则返回一个Promise,等待下一个事件
return new Promise(resolve => {
listeners.push(resolve);
});
},
return() {
// 清理事件监听器
target.removeEventListener(eventName, listener);
return Promise.resolve({ done: true });
}
};
}
};
}
// 使用示例 - 处理点击事件
async function handleClicks() {
const clickIterator = eventToAsyncIterator(document, 'click');
try {
console.log('开始监听点击事件,点击5次后结束');
let count = 0;
for await (const event of clickIterator) {
count++;
console.log(`点击 #${count} 在坐标 (${event.clientX}, ${event.clientY})`);
if (count >= 5) {
break; // 5次点击后停止迭代
}
}
console.log('点击事件监听结束');
} catch (error) {
console.error('处理点击事件出错:', error);
}
}
并发控制
异步迭代器可以用于实现并发控制:
async function* processWithConcurrencyLimit(items, processor, concurrency = 3) {
// 创建一个任务队列
const queue = [...items];
const inProgress = new Map();
const results = [];
// 处理队列中的任务
while (queue.length > 0 || inProgress.size > 0) {
// 填充进行中的任务,直到达到并发限制
while (queue.length > 0 && inProgress.size < concurrency) {
const item = queue.shift();
const index = items.indexOf(item);
// 创建处理Promise
const promise = (async () => {
try {
return await processor(item);
} finally {
inProgress.delete(index);
}
})();
inProgress.set(index, promise);
}
// 等待任何一个任务完成
if (inProgress.size > 0) {
const [index, completedPromise] = [...inProgress.entries()][0];
const result = await completedPromise;
results[index] = result;
// 生成结果
yield { item: items[index], result };
}
}
}
// 使用示例
async function processBatchWithLimit() {
const items = Array.from({ length: 10 }, (_, i) => `item-${i}`);
// 模拟处理函数
const processor = async (item) => {
const delay = Math.random() * 2000 + 1000;
await new Promise(resolve => setTimeout(resolve, delay));
return `processed-${item}`;
};
console.log('开始处理项目,最大并发数: 3');
for await (const { item, result } of processWithConcurrencyLimit(items, processor, 3)) {
console.log(`完成: ${item} -> ${result}`);
}
console.log('所有项目处理完成');
}
异步迭代器的高级技巧
自定义异步迭代器
除了使用异步生成器函数,我们还可以手动实现异步迭代器:
function createAsyncNumberIterator(start = 0, end = 10, delay = 1000) {
let current = start;
return {
[Symbol.asyncIterator]() {
return {
async next() {
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, delay));
if (current <= end) {
return { value: current++, done: false };
} else {
return { done: true };
}
},
// 可选的return方法,用于清理资源
async return() {
console.log('迭代器提前终止');
return { done: true };
},
// 可选的throw方法,用于处理错误
async throw(error) {
console.error('迭代器错误:', error);
return { done: true };
}
};
}
};
}
// 使用自定义异步迭代器
async function useCustomIterator() {
const asyncNumbers = createAsyncNumberIterator(1, 5, 500);
try {
for await (const num of asyncNumbers) {
console.log(`异步数字: ${num}`);
// 模拟提前终止迭代
if (num === 3) {
break; // 这将调用return()方法
}
}
} catch (error) {
console.error('迭代过程中出错:', error);
}
}
异步迭代器的组合与转换
我们可以创建工具函数来组合和转换异步迭代器:
// 映射异步迭代器的值
async function* mapAsyncIterable(asyncIterable, mapFn) {
for await (const item of asyncIterable) {
yield mapFn(item);
}
}
// 过滤异步迭代器的值
async function* filterAsyncIterable(asyncIterable, filterFn) {
for await (const item of asyncIterable) {
if (filterFn(item)) {
yield item;
}
}
}
// 限制异步迭代器的数量
async function* takeAsyncIterable(asyncIterable, limit) {
let count = 0;
for await (const item of asyncIterable) {
if (count >= limit) break;
yield item;
count++;
}
}
// 组合多个异步迭代器
async function* combineAsyncIterables(...asyncIterables) {
for (const asyncIterable of asyncIterables) {
for await (const item of asyncIterable) {
yield item;
}
}
}
// 使用示例
async function transformAsyncData() {
// 创建一个基础异步迭代器
async function* generateData() {
for (let i = 1; i <= 10; i++) {
await new Promise(resolve => setTimeout(resolve, 300));
yield i;
}
}
// 应用转换
const data = generateData();
const filtered = filterAsyncIterable(data, x => x % 2 === 0); // 只保留偶数
const mapped = mapAsyncIterable(filtered, x => x * 10); // 将每个值乘以10
const limited = takeAsyncIterable(mapped, 3); // 只取前3个值
// 消费转换后的迭代器
for await (const item of limited) {
console.log(`转换后的值: ${item}`);
}
}
异步迭代器与可观察对象
异步迭代器可以与可观察对象(Observable)结合使用:
// 将Observable转换为异步迭代器
function observableToAsyncIterable(observable) {
const queue = [];
let complete = false;
let error = null;
let resolveNext = null;
// 订阅Observable
const subscription = observable.subscribe({
next(value) {
if (resolveNext) {
resolveNext({ value, done: false });
resolveNext = null;
} else {
queue.push(value);
}
},
error(err) {
error = err;
if (resolveNext) {
resolveNext(Promise.reject(err));
resolveNext = null;
}
},
complete() {
complete = true;
if (resolveNext) {
resolveNext({ done: true });
resolveNext = null;
}
}
});
// 返回异步可迭代对象
return {
[Symbol.asyncIterator]() {
return {
async next() {
if (queue.length > 0) {
const value = queue.shift();
return { value, done: false };
}
if (complete) {
return { done: true };
}
if (error) {
return Promise.reject(error);
}
// 等待下一个值
return new Promise(resolve => {
resolveNext = resolve;
});
},
async return() {
subscription.unsubscribe();
return { done: true };
}
};
}
};
}
// 使用示例(需要RxJS库)
async function useObservableAsAsyncIterable() {
// 创建一个简单的Observable
const { interval, take } = rxjs;
const observable = interval(1000).pipe(take(5));
// 转换为异步迭代器
const asyncIterable = observableToAsyncIterable(observable);
// 使用for-await-of消费
for await (const value of asyncIterable) {
console.log(`从Observable接收到: ${value}`);
}
console.log('Observable完成');
}
错误处理与资源管理
异步迭代器中的错误处理
异步迭代器中的错误处理需要特别注意:
async function* errorProneGenerator() {
try {
yield 1;
// 模拟异步操作中的错误
await new Promise((_, reject) => setTimeout(() => reject(new Error('操作失败')), 500));
// 这行不会执行
yield 2;
} catch (error) {
// 捕获并处理错误
console.error('生成器内部捕获错误:', error);
// 可以选择继续生成值
yield '错误后的恢复值';
} finally {
// 清理资源
console.log('生成器清理资源');
}
}
async function handleGeneratorErrors() {
try {
const generator = errorProneGenerator();
for await (const value of generator) {
console.log(`接收到值: ${value}`);
}
} catch (error) {
// 捕获未在生成器内部处理的错误
console.error('外部捕获错误:', error);
}
}
资源管理与清理
使用异步迭代器时,确保正确清理资源非常重要:
class DatabaseConnection {
constructor(connectionString) {
this.connectionString = connectionString;
this.isConnected = false;
}
async connect() {
console.log(`连接到数据库: ${this.connectionString}`);
await new Promise(resolve => setTimeout(resolve, 500));
this.isConnected = true;
}
async disconnect() {
if (this.isConnected) {
console.log('断开数据库连接');
await new Promise(resolve => setTimeout(resolve, 300));
this.isConnected = false;
}
}
// 异步生成器方法,用于查询数据
async *queryBatch(query, batchSize = 2) {
if (!this.isConnected) {
await this.connect();
}
try {
// 模拟数据库中的记录
const allRecords = [
{ id: 1, name: '记录1' },
{ id: 2, name: '记录2' },
{ id: 3, name: '记录3' },
{ id: 4, name: '记录4' },
{ id: 5, name: '记录5' }
];
// 按批次生成记录
for (let i = 0; i < allRecords.length; i += batchSize) {
const batch = allRecords.slice(i, i + batchSize);
console.log(`获取批次: ${i / batchSize + 1}`);
// 模拟查询延迟
await new Promise(resolve => setTimeout(resolve, 500));
yield batch;
}
} finally {
// 注意:这里不要断开连接,因为迭代器可能会被重用
// 断开连接应该在迭代完成后手动进行
}
}
}
async function useDatabaseConnection() {
const db = new DatabaseConnection('mongodb://example.com/mydb');
try {
// 使用for-await-of迭代查询结果
for await (const batch of db.queryBatch('SELECT * FROM users')) {
console.log('处理批次:', batch);
}
} finally {
// 确保在完成后断开连接
await db.disconnect();
}
}
性能考虑与最佳实践
性能优化
使用异步迭代器时的性能优化技巧:
// 1. 避免不必要的异步操作
async function* efficientGenerator() {
// 不好的做法:每次迭代都有不必要的await
for (let i = 0; i < 1000; i++) {
yield await Promise.resolve(i); // 不必要的await
}
}
async function* betterGenerator() {
// 更好的做法:只在必要时使用await
for (let i = 0; i < 1000; i++) {
yield i; // 直接生成值,异步生成器会自动包装
}
// 只在真正需要异步操作时使用await
const data = await fetchData();
yield data;
}
// 2. 批量处理以减少异步开销
async function* batchProcessor(items, batchSize = 100) {
// 将项目分成批次
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
// 一次性处理整个批次
const results = await Promise.all(batch.map(processItem));
// 生成批处理结果
yield results;
}
}
最佳实践
使用异步迭代器的最佳实践:
// 1. 提供清理方法
async function* properCleanupGenerator() {
const resource = await acquireResource();
try {
yield* resource.getData();
} finally {
// 确保资源被释放
await resource.release();
}
}
// 2. 处理提前终止
function createRobustAsyncIterator() {
let resource = null;
return {
[Symbol.asyncIterator]() {
return {
async next() {
if (!resource) {
resource = await acquireResource();
}
try {
const result = await resource.getNext();
return result;
} catch (error) {
// 确保在错误时释放资源
if (resource) {
await resource.release();
resource = null;
}
throw error;
}
},
// 处理break语句或提前终止
async return() {
if (resource) {
await resource.release();
resource = null;
}
return { done: true };
}
};
}
};
}
// 3. 避免无限迭代器没有终止条件
async function* safeInfiniteGenerator() {
let count = 0;
const MAX_ITERATIONS = 1000; // 安全上限
while (true) {
if (count >= MAX_ITERATIONS) {
console.warn(`达到最大迭代次数 (${MAX_ITERATIONS})`);
return; // 终止生成器
}
yield count++;
await new Promise(resolve => setTimeout(resolve, 100));
}
}
浏览器和Node.js中的异步迭代器
浏览器中的异步迭代器
浏览器环境中使用异步迭代器的示例:
// 使用Fetch API和异步迭代器处理流数据
async function* fetchAsStream(url) {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP错误: ${response.status}`);
}
if (!response.body) {
throw new Error('ReadableStream不可用');
}
const reader = response.body.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
yield value;
}
} finally {
reader.releaseLock();
}
}
// 使用示例 - 下载大文件并显示进度
async function downloadWithProgress(url) {
let receivedBytes = 0;
const chunks = [];
try {
for await (const chunk of fetchAsStream(url)) {
chunks.push(chunk);
receivedBytes += chunk.length;
// 更新进度
console.log(`已接收 ${receivedBytes} 字节`);
updateProgressBar(receivedBytes);
}
// 合并所有块
const allBytes = new Uint8Array(receivedBytes);
let position = 0;
for (const chunk of chunks) {
allBytes.set(chunk, position);
position += chunk.length;
}
console.log('下载完成,总大小:', receivedBytes);
return allBytes;
} catch (error) {
console.error('下载出错:', error);
throw error;
}
}
Node.js中的异步迭代器
Node.js环境中使用异步迭代器的示例:
// Node.js中使用异步迭代器处理流
const fs = require('fs');
const { createReadStream } = fs;
const { createInterface } = require('readline');
// 按行读取文件的异步迭代器
async function* readFileLineByLine(filePath) {
const fileStream = createReadStream(filePath, { encoding: 'utf8' });
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
try {
// readline.Interface已经实现了Symbol.asyncIterator
for await (const line of rl) {
yield line;
}
} finally {
// 确保资源被释放
rl.close();
fileStream.close();
}
}
// 使用示例 - 处理大型日志文件
async function processLogFile(logPath) {
let lineCount = 0;
let errorCount = 0;
console.time('处理日志');
try {
for await (const line of readFileLineByLine(logPath)) {
lineCount++;
// 分析日志行
if (line.includes('ERROR')) {
errorCount++;
console.log(`发现错误 (${errorCount})`, line.substring(0, 80) + '...');
}
// 定期报告进度
if (lineCount % 10000 === 0) {
console.log(`已处理 ${lineCount} 行...`);
}
}
console.log(`日志分析完成: 总共 ${lineCount} 行, ${errorCount} 个错误`);
} catch (error) {
console.error('处理日志文件出错:', error);
} finally {
console.timeEnd('处理日志');
}
}
// 使用Node.js的流转换
const { Transform } = require('stream');
// 创建一个转换流并将其作为异步迭代器使用
function createTransformIterator(inputStream, transformFn) {
const transform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
// 应用转换函数
const result = transformFn(chunk.toString());
callback(null, result);
} catch (error) {
callback(error);
}
}
});
// 连接输入流到转换流
inputStream.pipe(transform);
// 返回转换流作为异步迭代器
return transform;
}
// 使用示例 - 处理并转换CSV数据
async function processCSV(csvPath) {
const fileStream = createReadStream(csvPath, { encoding: 'utf8' });
// 创建一个转换迭代器,将CSV行解析为对象
const rowIterator = createTransformIterator(fileStream, line => {
const [id, name, value] = line.split(',');
return { id, name, value: parseFloat(value) };
});
try {
for await (const row of rowIterator) {
console.log('处理行:', row);
// 进一步处理每一行...
}
} catch (error) {
console.error('处理CSV出错:', error);
} finally {
fileStream.close();
}
}
与其他JavaScript特性结合
与解构赋值结合
异步迭代器可以与解构赋值结合使用:
async function* generateUserData() {
// 模拟从不同API获取用户数据
const profile = await fetchUserProfile(userId);
yield { type: 'profile', data: profile };
const posts = await fetchUserPosts(userId);
yield { type: 'posts', data: posts };
const followers = await fetchUserFollowers(userId);
yield { type: 'followers', data: followers };
}
async function processUserData() {
const userDataGenerator = generateUserData();
// 使用解构赋值获取生成的值
const { value: profileData } = await userDataGenerator.next();
console.log('用户资料:', profileData.data);
const { value: postsData } = await userDataGenerator.next();
console.log(`用户有 ${postsData.data.length} 篇文章`);
const { value: followersData } = await userDataGenerator.next();
console.log(`用户有 ${followersData.data.length} 个关注者`);
}
与async/await结合
异步迭代器与async/await结合使用可以创建强大的异步处理流程:
async function processDataSequence() {
// 创建一个异步数据源
async function* dataSource() {
for (let i = 1; i <= 3; i++) {
// 模拟异步数据获取
const data = await fetchData(i);
yield data;
}
}
// 使用for-await-of和async/await处理数据
for await (const data of dataSource()) {
// 对每个数据项执行异步处理
const processedData = await processData(data);
// 异步保存结果
await saveResult(processedData);
console.log('数据处理并保存完成');
}
}
与Promise方法结合
异步迭代器可以与Promise的各种方法结合使用:
async function* fetchConcurrently(urls, concurrency = 3) {
// 将URL分成批次
for (let i = 0; i < urls.length; i += concurrency) {
const batch = urls.slice(i, i + concurrency);
// 并行获取一批URL
const results = await Promise.all(
batch.map(url => fetch(url).then(r => r.json()))
);
// 逐个生成结果
for (const result of results) {
yield result;
}
}
}
// 使用Promise.race与异步迭代器
async function* raceWithTimeout(asyncIterable, timeout = 5000) {
for await (const item of asyncIterable) {
// 为每个项目创建一个超时Promise
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('操作超时')), timeout);
});
try {
// 使用Promise.race在超时前处理项目
const processedItem = await Promise.race([
processItem(item),
timeoutPromise
]);
yield processedItem;
} catch (error) {
console.warn('处理项目超时,跳过:', item);
continue;
}
}
}
实际应用示例
实时数据处理
使用异步迭代器处理实时数据流:
// 使用WebSocket创建实时数据流
function createWebSocketStream(url) {
const ws = new WebSocket(url);
const messages = [];
const resolvers = [];
let closed = false;
ws.addEventListener('message', event => {
const data = JSON.parse(event.data);
if (resolvers.length > 0) {
const resolve = resolvers.shift();
resolve({ value: data, done: false });
} else {
messages.push(data);
}
});
ws.addEventListener('close', () => {
closed = true;
// 解析所有等待的Promise
for (const resolve of resolvers) {
resolve({ done: true });
}
resolvers.length = 0;
});
ws.addEventListener('error', error => {
console.error('WebSocket错误:', error);
ws.close();
});
return {
[Symbol.asyncIterator]() {
return {
async next() {
if (messages.length > 0) {
return { value: messages.shift(), done: false };
}
if (closed) {
return { done: true };
}
return new Promise(resolve => {
resolvers.push(resolve);
});
},
async return() {
ws.close();
return { done: true };
}
};
}
};
}
// 使用示例 - 处理实时市场数据
async function processMarketData() {
const marketDataStream = createWebSocketStream('wss://api.example.com/market-data');
try {
console.log('开始接收市场数据...');
for await (const data of marketDataStream) {
console.log(`收到 ${data.symbol} 价格更新: ${data.price}`);
// 根据价格变化执行操作
if (data.price > someThreshold) {
console.log(`${data.symbol} 价格超过阈值,执行操作...`);
await executeAction(data);
}
}
} catch (error) {
console.error('处理市场数据出错:', error);
} finally {
console.log('市场数据流已关闭');
}
}
数据批处理
使用异步迭代器进行大数据批处理:
// 批量处理大型数据集
async function* batchProcessor(dataSource, batchSize = 1000) {
let batch = [];
for await (const item of dataSource) {
batch.push(item);
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
}
// 处理最后一个不完整的批次
if (batch.length > 0) {
yield batch;
}
}
// 使用示例 - 批量处理用户数据
async function processUserRecords() {
// 假设这是一个返回大量用户记录的异步迭代器
const userRecords = getUserRecordsIterator();
let totalProcessed = 0;
let batchCount = 0;
console.time('批处理完成');
for await (const batch of batchProcessor(userRecords, 500)) {
batchCount++;
console.log(`处理批次 #${batchCount},包含 ${batch.length} 条记录`);
// 并行处理批次中的所有记录
await Promise.all(batch.map(async user => {
await processUserRecord(user);
}));
totalProcessed += batch.length;
console.log(`已处理 ${totalProcessed} 条记录`);
}
console.timeEnd('批处理完成');
console.log(`总共处理了 ${totalProcessed} 条用户记录,分为 ${batchCount} 个批次`);
}
数据管道处理
使用异步迭代器创建数据处理管道:
// 创建数据处理管道
async function* createProcessingPipeline(source, ...processors) {
// 应用所有处理器
let stream = source;
for (const processor of processors) {
stream = processor(stream);
}
// 生成最终结果
for await (const item of stream) {
yield item;
}
}
// 示例处理器
function filterProcessor(predicate) {
return async function* (source) {
for await (const item of source) {
if (predicate(item)) {
yield item;
}
}
};
}
function mapProcessor(transform) {
return async function* (source) {
for await (const item of source) {
yield transform(item);
}
};
}
function batchProcessor(size) {
return async function* (source) {
let batch = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
};
}
// 使用示例 - 创建数据处理管道
async function runDataPipeline() {
// 创建数据源
async function* dataSource() {
for (let i = 1; i <= 100; i++) {
yield { id: i, value: Math.random() * 100 };
}
}
// 创建处理管道
const pipeline = createProcessingPipeline(
dataSource(),
filterProcessor(item => item.value > 50), // 只保留value > 50的项目
mapProcessor(item => ({ ...item, processed: true, value: Math.round(item.value) })), // 转换项目
batchProcessor(10) // 按10个一批分组
);
// 消费处理管道
for await (const batch of pipeline) {
console.log(`处理批次,包含 ${batch.length} 项:`);
console.log(batch);
// 进一步处理批次...
await processBatch(batch);
}
}
总结
异步迭代器是JavaScript中处理异步数据流的强大工具,它们提供了以下优势:
简化异步数据处理:使用类似同步代码的方式处理异步数据序列。
按需处理:只在需要时才获取和处理数据,提高内存效率。
自然的错误处理:使用标准的try/catch语句处理异步错误。
资源管理:通过finally块和return方法确保资源正确释放。
与现有JavaScript特性集成:可以与Promise、async/await、解构赋值等特性无缝结合。
异步迭代器特别适合以下场景:
- 处理流数据(文件流、网络流)
- 分页API请求
- 事件流处理
- 实时数据处理
- 大数据批处理
通过掌握本文介绍的异步迭代器概念和技术,您可以更有效地处理各种异步数据流,编写更清晰、更高效的异步JavaScript代码。