目录
一、为什么选择 Bun 做流处理?
在 Node.js 生态里,流处理长期依赖 stream 模块——功能强大,但 API 繁琐,初学者容易被 EventEmitter、背压、pipe 陷阱坑到怀疑人生。2023 年后,Web Streams API(ReadableStream / WritableStream / TransformStream)成为跨运行时标准,浏览器、Deno、Node.js 18+ 以及 Bun 都已原生支持。
Bun 在流处理方面有三个关键优势:
| 特性 | Node.js | Bun |
|---|---|---|
| TypeScript 支持 | 需要 ts-node / tsc 编译 | 原生执行,零编译步骤 |
| Web Streams API | v18+ 实验性支持 | 内置,全面兼容 WHATWG 规范 |
| 文件 I/O | fs/promises + ReadableStream 转换 | Bun.file() 直接返回流 |
| 启动时间 | ~80ms(简单脚本) | ~5ms |
| 包管理 | npm(慢) | 内置,比 npm 快 20-30x |
tsconfig.json 额外配置,直接运行 bun run xxx.ts 即可。二、环境搭建与项目初始化
安装 Bun
如果还没安装 Bun,一条命令搞定:
# macOS / Linux
curl -fsSL https://bun.sh/install | bash
# Windows(推荐 WSL2,或使用 PowerShell)
powershell -c "irm bun.sh/install.ps1 | iex"
# 验证安装
bun --version # 应输出 1.1.x 或更高
Shell
初始化项目
mkdir bun-streams-demo && cd bun-streams-demo
bun init -y
# 安装 Bun 的 TypeScript 类型声明(可选,IDE 提示更友好)
bun add -d @types/bun
Shell
此时目录结构如下:
bun-streams-demo/
├── index.ts # 入口文件
├── package.json
└── tsconfig.json # bun init 自动生成,已预配置
Text
tsconfig.json 中的 paths、strict、target 等设置。对于流处理项目,默认配置足够,无需修改。三、ReadableStream:数据源的创建与消费
ReadableStream 是整个流处理体系的"源头"。理解它的工作原理是后续所有内容的基础。
3.1 基础用法:创建一个数值流
新建文件 01-readable.ts:
// 01-readable.ts
const numberStream = new ReadableStream<number>({
async pull(controller) {
for (let i = 1; i <= 5; i++) {
controller.enqueue(i);
// 模拟异步数据产生(如 DB 查询、网络请求)
await new Promise((r) => setTimeout(r, 100));
}
controller.close();
},
});
// 推荐:使用 for await...of 消费流(最简洁的方式)
for await (const value of numberStream) {
console.log("Received:", value);
}
// 输出:
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
TypeScript
bun run 01-readable.ts
Shell
3.2 使用 Reader 手动控制读取
有时需要更精细的控制——按需拉取,而非自动消费:
// 手动 reader 控制
const stream = new ReadableStream<string>({
start(controller) {
["chunk-1", "chunk-2", "chunk-3"].forEach((c) => controller.enqueue(c));
controller.close();
},
});
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log("Read:", value);
}
} finally {
// ⚠️ 务必释放锁,否则流无法被其他消费者使用
reader.releaseLock();
}
TypeScript
getReader() 后必须调用 releaseLock(),否则流被锁定,后续的 pipeTo()、pipeThrough() 都会报错。建议用 try/finally 确保释放。四、WritableStream:背压控制与安全写入
WritableStream 是数据的"终点"。最关键的特性是背压(Backpressure)——当下游处理速度跟不上上游产出时,自动暂停上游,防止内存无限增长。
4.1 实现带背压的写入端
新建 02-writable.ts:
// 02-writable.ts
const slowWriter = new WritableStream<Uint8Array>({
start() {
console.log("Writer started");
},
async write(chunk, _controller) {
const text = new TextDecoder().decode(chunk);
console.log(`Processing chunk: "${text}"`);
// 模拟耗时操作(如写磁盘、调用 API)
await new Promise((r) => setTimeout(r, 200));
// write() 返回 Promise,流会等待它 resolve 后再接收下一个 chunk
// 这就是背压机制的核心!
},
close() {
console.log("Writer closed, all chunks processed");
},
abort(reason) {
console.error("Writer aborted:", reason);
},
});
const encoder = new TextEncoder();
const writer = slowWriter.getWriter();
try {
await writer.write(encoder.encode("Hello"));
await writer.write(encoder.encode("World"));
await writer.write(encoder.encode("Bun Streams"));
await writer.close();
} finally {
writer.releaseLock();
}
TypeScript
4.2 背压感知的高吞吐写入
当写入大量数据时,应检测 desiredSize 来感知背压状态:
const writer = writable.getWriter();
async function writeWithBackpressure(data: string[]) {
for (const item of data) {
const ready = writer.ready; // 等待背压释放
if (ready) await ready;
await writer.write(new TextEncoder().encode(item));
}
await writer.close();
}
TypeScript
五、TransformStream:数据转换管道
TransformStream 是流处理体系的"中间件"——接收上游数据,变换后输出给下游。它同时持有一个 ReadableStream(输出侧)和一个 WritableStream(输入侧),通过 pipeThrough() 串联在管道中。
5.1 文本转大写(基础示例)
// 03-transform.ts
function createUpperCaseTransform() {
return new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
const text = new TextDecoder().decode(chunk);
controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
},
});
}
const input = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("hello "));
controller.enqueue(new TextEncoder().encode("bun streams"));
controller.close();
},
});
const transformed = input.pipeThrough(createUpperCaseTransform());
// 用 Response 帮助将流收集为字符串(Bun 内置支持)
const result = await new Response(transformed).text();
console.log(result); // "HELLO BUN STREAMS"
TypeScript
5.2 JSON 解析转换流
实际场景中,常需要将原始字节流解析为结构化数据。以下实现一个 NDJSON(Newline Delimited JSON)解析器:
// NDJSON 解析:每行一个 JSON 对象
function createNDJSONParser<T>() {
let buffer = "";
return new TransformStream<string, T>({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split("\n");
// 最后一段可能不完整,保留到下一个 chunk
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.trim()) {
try {
controller.enqueue(JSON.parse(line) as T);
} catch (e) {
controller.error(new Error(`JSON parse failed: ${line}`));
}
}
}
},
flush(controller) {
// 处理最后一行(无换行符结尾)
if (buffer.trim()) {
controller.enqueue(JSON.parse(buffer));
}
},
});
}
// 使用示例
type LogEntry = { level: string; message: string; ts: number };
const ndjsonStream = new ReadableStream<string>({
start(c) {
c.enqueue('{"level":"info","message":"Server started","ts":1709000000}\n');
c.enqueue('{"level":"warn","message":"High memory","ts":1709000001}\n');
c.enqueue('{"level":"error","message":"DB timeout","ts":1709000002}');
c.close();
},
});
for await (const entry of ndjsonStream.pipeThrough(createNDJSONParser<LogEntry>())) {
console.log(`[${entry.level.toUpperCase()}] ${entry.message}`);
}
TypeScript
六、链式管道:构建复杂处理流
真实的流处理系统往往需要多个变换步骤串联。Bun 的 pipeThrough() 支持无限链式组合:
ReadableStream
DecompressionStream
TextDecoderStream
TransformStream
WritableStream
6.1 日志处理管道实战
新建 04-pipeline.ts,实现一个完整的日志处理管道:读取原始日志 → 解析 NDJSON → 过滤错误级别 → 格式化输出:
// 04-pipeline.ts
// 通用工具:创建过滤转换流
function createFilter<T>(predicate: (item: T) => boolean) {
return new TransformStream<T, T>({
transform(chunk, controller) {
if (predicate(chunk)) controller.enqueue(chunk);
},
});
}
// 通用工具:创建映射转换流
function createMap<T, U>(mapper: (item: T) => U) {
return new TransformStream<T, U>({
transform(chunk, controller) {
controller.enqueue(mapper(chunk));
},
});
}
type LogEntry = { level: string; message: string; ts: number };
// 模拟日志数据源(真实场景可替换为文件流或网络流)
const rawLogSource = new ReadableStream<string>({
start(c) {
const logs: LogEntry[] = [
{ level: "debug", message: "Cache hit", ts: 1709000000 },
{ level: "info", message: "Request handled", ts: 1709000001 },
{ level: "error", message: "DB connection failed", ts: 1709000002 },
{ level: "warn", message: "Memory usage 85%", ts: 1709000003 },
{ level: "error", message: "Timeout after 30s", ts: 1709000004 },
];
logs.forEach((log) => c.enqueue(JSON.stringify(log) + "\n"));
c.close();
},
});
// 构建处理管道:解析 → 过滤 error → 格式化
const errorAlerts = rawLogSource
.pipeThrough(createNDJSONParser<LogEntry>())
.pipeThrough(createFilter((e) => e.level === "error"))
.pipeThrough(
createMap((e) => ({
...e,
formatted: `🚨 [${new Date(e.ts * 1000).toISOString()}] ${e.message}`,
}))
);
for await (const alert of errorAlerts) {
console.log(alert.formatted);
}
// 输出:
// 🚨 [2024-02-27T10:13:22.000Z] DB connection failed
// 🚨 [2024-02-27T10:13:24.000Z] Timeout after 30s
TypeScript
createFilter、createMap),可以在不同管道中复用,也更容易单独测试。七、文件流:高效处理大文件
处理大文件时,绝对不能一次性加载到内存。Bun 的 Bun.file() 直接返回流兼容的 BunFile 对象,配合 .stream() 方法实现真正的惰性读取。
7.1 流式读取大 CSV 文件
// 05-file-stream.ts
// 创建测试文件(真实场景替换为实际路径)
await Bun.write("data.csv", [
"id,name,score",
"1,Alice,95",
"2,Bob,78",
"3,Charlie,88",
"4,Diana,92",
].join("\n"));
// 流式读取并逐行处理(无论文件多大,内存使用恒定)
const file = Bun.file("data.csv");
const stream = file.stream();
// 用内置的 TextDecoderStream 解码字节流
const textStream = stream.pipeThrough(new TextDecoderStream());
let isFirstLine = true;
let buffer = "";
const results: { id: number; name: string; score: number }[] = [];
for await (const chunk of textStream) {
buffer += chunk;
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (isFirstLine) { isFirstLine = false; continue; } // skip header
const [id, name, score] = line.split(",");
if (id) results.push({ id: Number(id), name, score: Number(score) });
}
}
// 处理最后一行
if (buffer.trim()) {
const [id, name, score] = buffer.split(",");
results.push({ id: Number(id), name, score: Number(score) });
}
console.log("Top scorers:", results.filter((r) => r.score >= 90));
// [{ id: 1, name: 'Alice', score: 95 }, { id: 4, name: 'Diana', score: 92 }]
TypeScript
7.2 流式复制并转换(管道到文件)
// 流式读取源文件 → 转换 → 写入目标文件
const sourceStream = Bun.file("data.csv").stream();
const upperTransform = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
const text = new TextDecoder().decode(chunk);
controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
},
});
// Bun.file().writer() 返回 WritableStream 兼容对象
const dest = Bun.file("data-upper.csv").writer();
await sourceStream
.pipeThrough(upperTransform)
.pipeTo(dest);
console.log("Done! Check data-upper.csv");
TypeScript
Bun.file().writer() 返回的是 FileSink,实现了 UnderlyingSink 接口,可以直接作为 pipeTo() 的目标,内部使用零拷贝 I/O,性能极佳。八、进阶优化与生产实践
8.1 使用 Bun.ArrayBufferSink 高性能缓冲
需要将流数据收集到内存时,Bun.ArrayBufferSink 比手动拼接 Uint8Array 快很多:
// 高性能缓冲收集
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 64, // 预分配 64KB 缓冲区
stream: false, // false = 一次性收集模式
});
const stream = Bun.file("large-file.bin").stream();
for await (const chunk of stream) {
sink.write(chunk);
}
const buffer = sink.end() as ArrayBuffer;
console.log(`Collected ${buffer.byteLength} bytes`);
TypeScript
8.2 错误处理与流取消
生产系统必须优雅处理错误。以下是推荐的错误处理模式:
// 带错误恢复的管道执行器
async function runPipeline<T>(
source: ReadableStream<T>,
transforms: TransformStream<any, any>[],
destination: WritableStream<any>
): Promise<{ success: boolean; error?: string }> {
let current: ReadableStream = source;
for (const transform of transforms) {
current = current.pipeThrough(transform);
}
try {
await current.pipeTo(destination);
return { success: true };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error("Pipeline failed:", message);
return { success: false, error: message };
}
}
// 带超时的流读取(防止流卡死)
async function readWithTimeout<T>(
stream: ReadableStream<T>,
timeoutMs: number
): Promise<T[]> {
const results: T[] = [];
const abortController = new AbortController();
const timer = setTimeout(() => abortController.abort(), timeoutMs);
try {
for await (const item of stream) {
if (abortController.signal.aborted) break;
results.push(item);
}
} finally {
clearTimeout(timer);
}
return results;
}
TypeScript
8.3 Bun 内置流性能数据
vs Node.js 80ms
vs npm
编译步骤
增量(流模式)
8.4 生产最佳实践清单
常见问题解答
Readable 实现了 async iterable 接口,可以直接传给 new Response(readable) 或用 for await...of 消费。反方向(Web Stream → Node.js Readable)需要用 Readable.fromWeb(webStream)。尽量统一使用 Web Streams API,避免混用带来的复杂性。stream.pipeThrough(createTap(chunk => console.log('[DEBUG]', chunk))),其中 createTap 在 transform 中 enqueue 原始 chunk 不做修改。这样不影响管道逻辑,又能观察数据流经过每个阶段的状态。bun test,天然支持 async/await。测试流时,将 ReadableStream 数据收集到数组后断言,或将输出 pipeTo 一个自定义 WritableStream 来捕获结果,非常简洁。controller.close())后,所有 chunk 处理完毕,WritableStream 端发送 close 信号,此时 flush() 被调用。适合处理缓冲区中剩余的不完整数据(如 NDJSON 最后一行没有换行符的情况)。