一、为什么选择 Bun 做流处理?

Bun 流处理系统架构概览

在 Node.js 生态里,流处理长期依赖 stream 模块——功能强大,但 API 繁琐,初学者容易被 EventEmitter、背压、pipe 陷阱坑到怀疑人生。2023 年后,Web Streams API(ReadableStream / WritableStream / TransformStream)成为跨运行时标准,浏览器、Deno、Node.js 18+ 以及 Bun 都已原生支持。

Bun 在流处理方面有三个关键优势:

特性Node.jsBun
TypeScript 支持需要 ts-node / tsc 编译原生执行,零编译步骤
Web Streams APIv18+ 实验性支持内置,全面兼容 WHATWG 规范
文件 I/Ofs/promises + ReadableStream 转换Bun.file() 直接返回流
启动时间~80ms(简单脚本)~5ms
包管理npm(慢)内置,比 npm 快 20-30x
ℹ️
本教程基于 Bun 1.1+,全程使用 TypeScript 编写,无需任何 tsconfig.json 额外配置,直接运行 bun run xxx.ts 即可。

二、环境搭建与项目初始化

项目环境搭建流程图
1

安装 Bun

如果还没安装 Bun,一条命令搞定:

# macOS / Linux
curl -fsSL https://bun.sh/install | bash

# Windows(推荐 WSL2,或使用 PowerShell)
powershell -c "irm bun.sh/install.ps1 | iex"

# 验证安装
bun --version   # 应输出 1.1.x 或更高
Shell
2

初始化项目

mkdir bun-streams-demo && cd bun-streams-demo
bun init -y

# 安装 Bun 的 TypeScript 类型声明(可选,IDE 提示更友好)
bun add -d @types/bun
Shell

此时目录结构如下:

bun-streams-demo/
├── index.ts          # 入口文件
├── package.json
└── tsconfig.json     # bun init 自动生成,已预配置
Text
Bun 自动识别 tsconfig.json 中的 pathsstricttarget 等设置。对于流处理项目,默认配置足够,无需修改。

三、ReadableStream:数据源的创建与消费

ReadableStream 数据流转示意图

ReadableStream 是整个流处理体系的"源头"。理解它的工作原理是后续所有内容的基础。

3.1 基础用法:创建一个数值流

新建文件 01-readable.ts

// 01-readable.ts
const numberStream = new ReadableStream<number>({
  async pull(controller) {
    for (let i = 1; i <= 5; i++) {
      controller.enqueue(i);
      // 模拟异步数据产生(如 DB 查询、网络请求)
      await new Promise((r) => setTimeout(r, 100));
    }
    controller.close();
  },
});

// 推荐:使用 for await...of 消费流(最简洁的方式)
for await (const value of numberStream) {
  console.log("Received:", value);
}

// 输出:
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
TypeScript
bun run 01-readable.ts
Shell

3.2 使用 Reader 手动控制读取

有时需要更精细的控制——按需拉取,而非自动消费:

// 手动 reader 控制
const stream = new ReadableStream<string>({
  start(controller) {
    ["chunk-1", "chunk-2", "chunk-3"].forEach((c) => controller.enqueue(c));
    controller.close();
  },
});

const reader = stream.getReader();
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log("Read:", value);
  }
} finally {
  // ⚠️ 务必释放锁,否则流无法被其他消费者使用
  reader.releaseLock();
}
TypeScript
⚠️
常见陷阱:调用 getReader() 后必须调用 releaseLock(),否则流被锁定,后续的 pipeTo()pipeThrough() 都会报错。建议用 try/finally 确保释放。

四、WritableStream:背压控制与安全写入

WritableStream 背压机制对比图

WritableStream 是数据的"终点"。最关键的特性是背压(Backpressure)——当下游处理速度跟不上上游产出时,自动暂停上游,防止内存无限增长。

4.1 实现带背压的写入端

新建 02-writable.ts

// 02-writable.ts
const slowWriter = new WritableStream<Uint8Array>({
  start() {
    console.log("Writer started");
  },

  async write(chunk, _controller) {
    const text = new TextDecoder().decode(chunk);
    console.log(`Processing chunk: "${text}"`);
    // 模拟耗时操作(如写磁盘、调用 API)
    await new Promise((r) => setTimeout(r, 200));
    // write() 返回 Promise,流会等待它 resolve 后再接收下一个 chunk
    // 这就是背压机制的核心!
  },

  close() {
    console.log("Writer closed, all chunks processed");
  },

  abort(reason) {
    console.error("Writer aborted:", reason);
  },
});

const encoder = new TextEncoder();
const writer = slowWriter.getWriter();

try {
  await writer.write(encoder.encode("Hello"));
  await writer.write(encoder.encode("World"));
  await writer.write(encoder.encode("Bun Streams"));
  await writer.close();
} finally {
  writer.releaseLock();
}
TypeScript

4.2 背压感知的高吞吐写入

当写入大量数据时,应检测 desiredSize 来感知背压状态:

const writer = writable.getWriter();

async function writeWithBackpressure(data: string[]) {
  for (const item of data) {
    const ready = writer.ready;  // 等待背压释放
    if (ready) await ready;
    await writer.write(new TextEncoder().encode(item));
  }
  await writer.close();
}
TypeScript

五、TransformStream:数据转换管道

TransformStream 数据变换流程图

TransformStream 是流处理体系的"中间件"——接收上游数据,变换后输出给下游。它同时持有一个 ReadableStream(输出侧)和一个 WritableStream(输入侧),通过 pipeThrough() 串联在管道中。

5.1 文本转大写(基础示例)

// 03-transform.ts
function createUpperCaseTransform() {
  return new TransformStream<Uint8Array, Uint8Array>({
    transform(chunk, controller) {
      const text = new TextDecoder().decode(chunk);
      controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
    },
  });
}

const input = new ReadableStream({
  start(controller) {
    controller.enqueue(new TextEncoder().encode("hello "));
    controller.enqueue(new TextEncoder().encode("bun streams"));
    controller.close();
  },
});

const transformed = input.pipeThrough(createUpperCaseTransform());

// 用 Response 帮助将流收集为字符串(Bun 内置支持)
const result = await new Response(transformed).text();
console.log(result); // "HELLO BUN STREAMS"
TypeScript

5.2 JSON 解析转换流

实际场景中,常需要将原始字节流解析为结构化数据。以下实现一个 NDJSON(Newline Delimited JSON)解析器:

// NDJSON 解析:每行一个 JSON 对象
function createNDJSONParser<T>() {
  let buffer = "";

  return new TransformStream<string, T>({
    transform(chunk, controller) {
      buffer += chunk;
      const lines = buffer.split("\n");
      // 最后一段可能不完整,保留到下一个 chunk
      buffer = lines.pop() ?? "";

      for (const line of lines) {
        if (line.trim()) {
          try {
            controller.enqueue(JSON.parse(line) as T);
          } catch (e) {
            controller.error(new Error(`JSON parse failed: ${line}`));
          }
        }
      }
    },
    flush(controller) {
      // 处理最后一行(无换行符结尾)
      if (buffer.trim()) {
        controller.enqueue(JSON.parse(buffer));
      }
    },
  });
}

// 使用示例
type LogEntry = { level: string; message: string; ts: number };

const ndjsonStream = new ReadableStream<string>({
  start(c) {
    c.enqueue('{"level":"info","message":"Server started","ts":1709000000}\n');
    c.enqueue('{"level":"warn","message":"High memory","ts":1709000001}\n');
    c.enqueue('{"level":"error","message":"DB timeout","ts":1709000002}');
    c.close();
  },
});

for await (const entry of ndjsonStream.pipeThrough(createNDJSONParser<LogEntry>())) {
  console.log(`[${entry.level.toUpperCase()}] ${entry.message}`);
}
TypeScript

六、链式管道:构建复杂处理流

链式流处理管道架构图

真实的流处理系统往往需要多个变换步骤串联。Bun 的 pipeThrough() 支持无限链式组合:

数据源
ReadableStream
解压缩
DecompressionStream
解码
TextDecoderStream
过滤/转换
TransformStream
写入目标
WritableStream

6.1 日志处理管道实战

新建 04-pipeline.ts,实现一个完整的日志处理管道:读取原始日志 → 解析 NDJSON → 过滤错误级别 → 格式化输出:

// 04-pipeline.ts
// 通用工具:创建过滤转换流
function createFilter<T>(predicate: (item: T) => boolean) {
  return new TransformStream<T, T>({
    transform(chunk, controller) {
      if (predicate(chunk)) controller.enqueue(chunk);
    },
  });
}

// 通用工具:创建映射转换流
function createMap<T, U>(mapper: (item: T) => U) {
  return new TransformStream<T, U>({
    transform(chunk, controller) {
      controller.enqueue(mapper(chunk));
    },
  });
}

type LogEntry = { level: string; message: string; ts: number };

// 模拟日志数据源(真实场景可替换为文件流或网络流)
const rawLogSource = new ReadableStream<string>({
  start(c) {
    const logs: LogEntry[] = [
      { level: "debug", message: "Cache hit", ts: 1709000000 },
      { level: "info", message: "Request handled", ts: 1709000001 },
      { level: "error", message: "DB connection failed", ts: 1709000002 },
      { level: "warn", message: "Memory usage 85%", ts: 1709000003 },
      { level: "error", message: "Timeout after 30s", ts: 1709000004 },
    ];
    logs.forEach((log) => c.enqueue(JSON.stringify(log) + "\n"));
    c.close();
  },
});

// 构建处理管道:解析 → 过滤 error → 格式化
const errorAlerts = rawLogSource
  .pipeThrough(createNDJSONParser<LogEntry>())
  .pipeThrough(createFilter((e) => e.level === "error"))
  .pipeThrough(
    createMap((e) => ({
      ...e,
      formatted: `🚨 [${new Date(e.ts * 1000).toISOString()}] ${e.message}`,
    }))
  );

for await (const alert of errorAlerts) {
  console.log(alert.formatted);
}
// 输出:
// 🚨 [2024-02-27T10:13:22.000Z] DB connection failed
// 🚨 [2024-02-27T10:13:24.000Z] Timeout after 30s
TypeScript
设计原则:将每个变换封装为独立的工厂函数(如 createFiltercreateMap),可以在不同管道中复用,也更容易单独测试。

七、文件流:高效处理大文件

Bun 文件流高性能处理示意图

处理大文件时,绝对不能一次性加载到内存。Bun 的 Bun.file() 直接返回流兼容的 BunFile 对象,配合 .stream() 方法实现真正的惰性读取。

7.1 流式读取大 CSV 文件

// 05-file-stream.ts
// 创建测试文件(真实场景替换为实际路径)
await Bun.write("data.csv", [
  "id,name,score",
  "1,Alice,95",
  "2,Bob,78",
  "3,Charlie,88",
  "4,Diana,92",
].join("\n"));

// 流式读取并逐行处理(无论文件多大,内存使用恒定)
const file = Bun.file("data.csv");
const stream = file.stream();

// 用内置的 TextDecoderStream 解码字节流
const textStream = stream.pipeThrough(new TextDecoderStream());

let isFirstLine = true;
let buffer = "";
const results: { id: number; name: string; score: number }[] = [];

for await (const chunk of textStream) {
  buffer += chunk;
  const lines = buffer.split("\n");
  buffer = lines.pop() ?? "";

  for (const line of lines) {
    if (isFirstLine) { isFirstLine = false; continue; } // skip header
    const [id, name, score] = line.split(",");
    if (id) results.push({ id: Number(id), name, score: Number(score) });
  }
}

// 处理最后一行
if (buffer.trim()) {
  const [id, name, score] = buffer.split(",");
  results.push({ id: Number(id), name, score: Number(score) });
}

console.log("Top scorers:", results.filter((r) => r.score >= 90));
// [{ id: 1, name: 'Alice', score: 95 }, { id: 4, name: 'Diana', score: 92 }]
TypeScript

7.2 流式复制并转换(管道到文件)

// 流式读取源文件 → 转换 → 写入目标文件
const sourceStream = Bun.file("data.csv").stream();

const upperTransform = new TransformStream<Uint8Array, Uint8Array>({
  transform(chunk, controller) {
    const text = new TextDecoder().decode(chunk);
    controller.enqueue(new TextEncoder().encode(text.toUpperCase()));
  },
});

// Bun.file().writer() 返回 WritableStream 兼容对象
const dest = Bun.file("data-upper.csv").writer();

await sourceStream
  .pipeThrough(upperTransform)
  .pipeTo(dest);

console.log("Done! Check data-upper.csv");
TypeScript
ℹ️
Bun.file().writer() vs WritableStream:Bun.file().writer() 返回的是 FileSink,实现了 UnderlyingSink 接口,可以直接作为 pipeTo() 的目标,内部使用零拷贝 I/O,性能极佳。

八、进阶优化与生产实践

Bun 流处理性能对比与最佳实践

8.1 使用 Bun.ArrayBufferSink 高性能缓冲

需要将流数据收集到内存时,Bun.ArrayBufferSink 比手动拼接 Uint8Array 快很多:

// 高性能缓冲收集
const sink = new Bun.ArrayBufferSink();
sink.start({
  highWaterMark: 1024 * 64,  // 预分配 64KB 缓冲区
  stream: false,              // false = 一次性收集模式
});

const stream = Bun.file("large-file.bin").stream();
for await (const chunk of stream) {
  sink.write(chunk);
}

const buffer = sink.end() as ArrayBuffer;
console.log(`Collected ${buffer.byteLength} bytes`);
TypeScript

8.2 错误处理与流取消

生产系统必须优雅处理错误。以下是推荐的错误处理模式:

// 带错误恢复的管道执行器
async function runPipeline<T>(
  source: ReadableStream<T>,
  transforms: TransformStream<any, any>[],
  destination: WritableStream<any>
): Promise<{ success: boolean; error?: string }> {
  let current: ReadableStream = source;

  for (const transform of transforms) {
    current = current.pipeThrough(transform);
  }

  try {
    await current.pipeTo(destination);
    return { success: true };
  } catch (error) {
    const message = error instanceof Error ? error.message : String(error);
    console.error("Pipeline failed:", message);
    return { success: false, error: message };
  }
}

// 带超时的流读取(防止流卡死)
async function readWithTimeout<T>(
  stream: ReadableStream<T>,
  timeoutMs: number
): Promise<T[]> {
  const results: T[] = [];
  const abortController = new AbortController();
  const timer = setTimeout(() => abortController.abort(), timeoutMs);

  try {
    for await (const item of stream) {
      if (abortController.signal.aborted) break;
      results.push(item);
    }
  } finally {
    clearTimeout(timer);
  }
  return results;
}
TypeScript

8.3 Bun 内置流性能数据

5ms
Bun 启动时间
vs Node.js 80ms
30x
包安装速度
vs npm
TypeScript
编译步骤
~0
大文件内存
增量(流模式)

8.4 生产最佳实践清单

始终释放 Reader/Writer 锁 使用 try/finally 确保清理 监控 desiredSize 感知背压 拆分独立 TransformStream 工厂 大文件永远用流,不用 readFile 错误用 controller.error() 传播 善用 flush() 处理最后一批数据

常见问题解答

Bun 的 Web Streams 和 Node.js stream 模块能混用吗?
可以部分混用。Node.js Readable 实现了 async iterable 接口,可以直接传给 new Response(readable) 或用 for await...of 消费。反方向(Web Stream → Node.js Readable)需要用 Readable.fromWeb(webStream)。尽量统一使用 Web Streams API,避免混用带来的复杂性。
流处理中如何调试?console.log 能看到数据吗?
可以插入一个"透明"的调试 TransformStream:stream.pipeThrough(createTap(chunk => console.log('[DEBUG]', chunk))),其中 createTap 在 transform 中 enqueue 原始 chunk 不做修改。这样不影响管道逻辑,又能观察数据流经过每个阶段的状态。
怎么对流处理写单元测试?
Bun 内置测试框架 bun test,天然支持 async/await。测试流时,将 ReadableStream 数据收集到数组后断言,或将输出 pipeTo 一个自定义 WritableStream 来捕获结果,非常简洁。
TransformStream 中的 flush() 什么时候会被调用?
当上游的 ReadableStream 关闭(调用 controller.close())后,所有 chunk 处理完毕,WritableStream 端发送 close 信号,此时 flush() 被调用。适合处理缓冲区中剩余的不完整数据(如 NDJSON 最后一行没有换行符的情况)。