获取高质量、结构化的API数据是一回事,但现实世界中,大量关键信息被锁定在动态渲染、需要复杂用户交互才能访问的Web应用里。单纯依赖API,我们丢失了用户视角下的真实页面呈现、第三方脚本行为以及通过交互才能触发的数据。最初我们尝试构建一个简单的爬虫集群,用一个中央调度器通过HTTP直接分发任务给Playwright工作节点。这种架构的脆弱性在第一次遇到网络抖动时就暴露无遗:任务丢失、状态不一致、节点故障导致数据采集黑洞,整个系统耦合度极高,难以维护和扩展。
问题的核心在于缺乏一个可靠的、解耦的通信与任务持久化层。我们需要一个总线,它不仅能分发任务,还能保证任务不丢失,即使工作节点崩溃;它不仅能接收结果,还能缓冲数据,应对下游数据湖的暂时不可用。这就是我们将目光投向NATS JetStream的原因。它提供的持久化流、消费者确认机制以及重投递策略,恰好是解决我们痛点的关键。
我们的目标是构建一个弹性的非结构化数据采集总线。在这个架构中,Playwright工作节点是纯粹的执行单元,它们从一个持久化的任务流中拉取任务,执行浏览器自动化操作,然后将结果(包括HTML快照、截图、HAR文件等)推送到另一个结果流中。一个独立的摄入服务消费结果流,并将这些非结构化数据归档到S3兼容的数据湖(我们使用MinIO)中。整个系统通过NATS JetStream进行解耦,实现了水平扩展和故障容错。
架构设计与核心组件
整个系统的生命周期围绕着消息的流动。它不是一个请求-响应模型,而是一个纯粹的事件驱动架构。
graph TD subgraph "任务发布方 (Dispatcher)" A[Dispatcher App] -- "发布采集任务 (JSON)" --> B(NATS JetStream); end subgraph "NATS JetStream (消息总线)" B -- "tasks.scrape" --> C{Stream: TASKS}; D{Stream: RESULTS} -- "results.archive" --> E(NATS JetStream); end subgraph "弹性工作节点池 (Playwright Workers)" F[Worker 1] -->|拉取任务| C; G[Worker 2] -->|拉取任务| C; H[Worker N] -->|拉取任务| C; F -- "发布采集结果" --> E; G -- "发布采集结果" --> E; H -- "发布采集结果" --> E; end subgraph "数据湖摄入服务 (Ingestion Service)" I[Ingester Service] -->|消费结果| D; I -- "写入原始数据" --> J[(Data Lake / MinIO)]; end style F fill:#d4fcd7,stroke:#333,stroke-width:2px style G fill:#d4fcd7,stroke:#333,stroke-width:2px style H fill:#d4fcd7,stroke:#333,stroke-width:2px style I fill:#fcc0d7,stroke:#333,stroke-width:2px
NATS JetStream: 系统的核心。我们定义了两个流(Stream):
-
TASKS
: 用于存放待处理的抓取任务。任务发布后会一直保留,直到被工作节点成功处理并确认。我们使用WorkQueue
保留策略,确保消息在被确认前只会被一个消费者处理。 -
RESULTS
: 用于存放Playwright工作节点的产出。下游的摄入服务会消费这些结果。我们使用Limits
保留策略,可以根据消息数量或年龄进行清理,防止结果无限堆积。
-
Playwright Worker: 无状态的Node.js/TypeScript应用。它们是纯粹的消费者和生产者。启动后,连接到NATS,从
TASKS
流中拉取任务,执行Playwright脚本,然后将结果(成功或失败)发布到RESULTS
流。关键点在于,只有当结果成功发布到RESULTS
流后,它才会向NATS发送ack()
确认,将任务从TASKS
流中移除。如果工作节点在处理过程中崩溃,NATS会因为长时间未收到ack
而将任务重新分发给另一个健康的工作节点。Ingestion Service: 另一个独立的微服务,它的唯一职责是从
RESULTS
流中消费数据,并将其持久化到MinIO。这种分离确保了数据采集和数据存储的关注点分离。即使MinIO暂时不可用,数据也会安全地保留在NATS JetStream的RESULTS
流中,待服务恢复后继续处理。
生产级代码实现
让我们深入代码。这是一个基于TypeScript的Monorepo项目结构,包含worker
, ingester
和 dispatcher
三个包。
1. NATS JetStream 初始化与配置
在生产环境中,流的定义应该通过代码或运维脚本进行幂等管理。
common/nats-setup.ts
import { connect, JetStreamManager, NatsConnection, StreamConfig } from 'nats';
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
export const STREAMS = {
TASKS: "TASKS",
RESULTS: "RESULTS",
};
export const SUBJECTS = {
TASK_SCRAPE: "tasks.scrape",
RESULT_ARCHIVE: "results.archive",
};
async function ensureStream(jsm: JetStreamManager, config: Partial<StreamConfig>) {
try {
const stream = await jsm.streams.info(config.name!);
// 在真实项目中,这里可能需要对比并更新配置
console.log(`Stream ${config.name} already exists.`);
} catch (err: any) {
if (err.code === '404') {
await jsm.streams.add(config);
console.log(`Stream ${config.name} created.`);
} else {
throw err;
}
}
}
export async function setupNatsStreams(): Promise<NatsConnection> {
const nc = await connect({ servers: NATS_URL });
const jsm = await nc.jetstreamManager();
// 任务流: 确保任务至少被处理一次
await ensureStream(jsm, {
name: STREAMS.TASKS,
subjects: [`${SUBJECTS.TASK_SCRAPE}`],
storage: 'file', // 持久化到磁盘
retention: 'workqueue', // 消息被消费确认后删除
ack_policy: 'explicit', // 需要显式确认
max_age: 24 * 60 * 60 * 1_000_000_000, // 任务有效期24小时 (单位: ns)
num_replicas: 1, // 在单节点测试环境中为1
});
// 结果流: 存储采集结果,允许下游服务宕机一段时间
await ensureStream(jsm, {
name: STREAMS.RESULTS,
subjects: [`${SUBJECTS.RESULT_ARCHIVE}`],
storage: 'file',
retention: 'limits',
max_bytes: 10 * 1024 * 1024 * 1024, // 最大10GB
max_age: 7 * 24 * 60 * 60 * 1_000_000_000, // 最长保留7天
num_replicas: 1,
});
console.log("NATS JetStream streams configured.");
return nc;
}
这段代码的核心是ensureStream
,它保证了我们的流配置是声明式的。ack_policy: 'explicit'
是实现可靠性的基石。
2. 弹性的Playwright工作节点
工作节点是系统的“肌肉”,它必须健壮。
worker/src/worker.ts
import { chromium, Browser, Page } from 'playwright';
import { connect, JetStreamClient, JSONCodec, NatsConnection } from 'nats';
import { v4 as uuidv4 } from 'uuid';
import { STREAMS, SUBJECTS } from '../../common/nats-setup';
// 定义任务和结果的数据结构
interface ScrapeTask {
taskId: string;
url: string;
steps: { action: 'goto' | 'screenshot' | 'html'; selector?: string }[];
}
interface ScrapeResult {
taskId: string;
workerId: string;
url: string;
success: boolean;
data?: { type: 'html' | 'screenshot'; content: string; key: string }; // content for html, key for screenshot
error?: string;
timestamp: string;
}
const WORKER_ID = `worker-${uuidv4()}`;
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const jsonCodec = JSONCodec();
class ScraperWorker {
private nc!: NatsConnection;
private js!: JetStreamClient;
private browser!: Browser;
async start() {
console.log(`[${WORKER_ID}] Starting...`);
this.nc = await connect({ servers: NATS_URL, name: WORKER_ID });
this.js = this.nc.jetstream();
this.browser = await chromium.launch({ headless: true });
// 创建一个持久化的消费者
const consumer = await this.js.consumers.get(STREAMS.TASKS, {
durable_name: 'scraping-worker-pool', // 所有worker共享同一个durable name,形成竞争消费
ack_policy: 'explicit',
max_ack_pending: 5, // 同时处理最多5个任务
});
console.log(`[${WORKER_ID}] Ready and waiting for tasks...`);
// 拉取消息,而不是推送。这提供了更好的流控。
const messages = await consumer.consume();
for await (const msg of messages) {
const task = jsonCodec.decode(msg.data) as ScrapeTask;
console.log(`[${WORKER_ID}] Received task ${task.taskId} for URL: ${task.url}`);
let result: ScrapeResult;
try {
const page = await this.browser.newPage();
// 在生产中,这里的超时和重试逻辑需要更复杂
await page.goto(task.url, { waitUntil: 'domcontentloaded', timeout: 30000 });
// 简单示例:获取HTML
const htmlContent = await page.content();
const resultKey = `${new Date().toISOString()}-${task.taskId}.html`;
await page.close();
result = {
taskId: task.taskId,
workerId: WORKER_ID,
url: task.url,
success: true,
data: { type: 'html', content: htmlContent, key: resultKey },
timestamp: new Date().toISOString(),
};
} catch (error: any) {
console.error(`[${WORKER_ID}] Failed task ${task.taskId}: ${error.message}`);
result = {
taskId: task.taskId,
workerId: WORKER_ID,
url: task.url,
success: false,
error: error.message,
timestamp: new Date().toISOString(),
};
}
// 关键步骤:先发布结果,再确认任务
try {
await this.js.publish(SUBJECTS.RESULT_ARCHIVE, jsonCodec.encode(result));
msg.ack(); // 任务处理完成,从TASKS流中移除
console.log(`[${WORKER_ID}] Task ${task.taskId} processed and acknowledged.`);
} catch (pubError) {
console.error(`[${WORKER_ID}] CRITICAL: Failed to publish result for task ${task.taskId}. Task will be redelivered.`, pubError);
// 这里不ack,让NATS超时后重投递任务
}
}
}
async stop() {
console.log(`[${WORKER_ID}] Shutting down...`);
await this.browser.close();
await this.nc.close();
}
}
const worker = new ScraperWorker();
worker.start();
process.on('SIGINT', () => worker.stop());
process.on('SIGTERM', () => worker.stop());
这里的 durable_name: 'scraping-worker-pool'
是实现水平扩展的关键。所有使用相同 durable_name
的消费者构成一个消费组,NATS JetStream会确保一个消息只被组内的一个消费者处理。如果一个worker崩溃,它正在处理(但未ack
)的消息将在一个可配置的超时后(ack_wait
)被重新投递给消费组里的其他worker。
3. 数据湖摄入服务
该服务负责将采集到的原始数据安全地存入MinIO。
ingester/src/ingester.ts
import { Client as MinioClient } from 'minio';
import { connect, JetStreamClient, JSONCodec, NatsConnection } from 'nats';
import { STREAMS, SUBJECTS } from '../../common/nats-setup';
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT || 'localhost';
const MINIO_PORT = parseInt(process.env.MINIO_PORT || '9000', 10);
const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY || 'minioadmin';
const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY || 'minioadmin';
const BUCKET_NAME = 'raw-web-data';
const jsonCodec = JSONCodec();
// 定义接收的数据结构
interface ScrapeResult {
taskId: string;
url: string;
success: boolean;
data?: { type: 'html' | 'screenshot'; content: string; key: string };
error?: string;
}
class IngesterService {
private nc!: NatsConnection;
private js!: JetStreamClient;
private minioClient: MinioClient;
constructor() {
this.minioClient = new MinioClient({
endPoint: MINIO_ENDPOINT,
port: MINIO_PORT,
useSSL: false,
accessKey: MINIO_ACCESS_KEY,
secretKey: MINIO_SECRET_KEY,
});
}
async start() {
console.log('[Ingester] Starting...');
this.nc = await connect({ servers: NATS_URL, name: "ingester-service" });
this.js = this.nc.jetstream();
// 确保bucket存在
const bucketExists = await this.minioClient.bucketExists(BUCKET_NAME);
if (!bucketExists) {
console.log(`[Ingester] Bucket ${BUCKET_NAME} does not exist. Creating...`);
await this.minioClient.makeBucket(BUCKET_NAME, 'us-east-1');
}
const consumer = await this.js.consumers.get(STREAMS.RESULTS, {
durable_name: 'data-lake-ingester',
ack_policy: 'explicit',
});
console.log('[Ingester] Ready and waiting for results...');
const messages = await consumer.consume();
for await (const msg of messages) {
const result = jsonCodec.decode(msg.data) as ScrapeResult;
console.log(`[Ingester] Received result for task ${result.taskId}`);
if (result.success && result.data) {
try {
// 根据日期和任务ID组织存储路径
const date = new Date();
const path = `${date.getFullYear()}/${(date.getMonth() + 1).toString().padStart(2, '0')}/${date.getDate().toString().padStart(2, '0')}/${result.taskId}/${result.data.key}`;
await this.minioClient.putObject(
BUCKET_NAME,
path,
Buffer.from(result.data.content), // 在真实场景中,截图等二进制文件需要特殊处理
{ 'Content-Type': 'text/html' }
);
console.log(`[Ingester] Stored result for ${result.taskId} at ${path}`);
msg.ack();
} catch (storageError) {
console.error(`[Ingester] CRITICAL: Failed to store result for ${result.taskId} in MinIO. Will not ACK.`, storageError);
// 存储失败,不ack,消息将重投
}
} else {
// 处理失败的任务,可以记录到日志或另一个NATS流
console.warn(`[Ingester] Skipping failed task ${result.taskId}: ${result.error}`);
msg.ack(); // 即使是失败的结果,我们也消费掉,避免无限重试
}
}
}
async stop() {
console.log('[Ingester] Shutting down...');
await this.nc.close();
}
}
const ingester = new IngesterService();
ingester.start();
process.on('SIGINT', () => ingester.stop());
process.on('SIGTERM', () => ingester.stop());
4. 任务发布器
一个简单的脚本,用于向系统中注入任务。
dispatcher/src/dispatcher.ts
import { connect, JSONCodec } from 'nats';
import { v4 as uuidv4 } from 'uuid';
import { SUBJECTS } from '../../common/nats-setup';
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const jsonCodec = JSONCodec();
async function main() {
const nc = await connect({ servers: NATS_URL });
const js = nc.jetstream();
const tasksToDispatch = [
{ url: 'https://www.google.com', steps: [{ action: 'html' }] },
{ url: 'https://github.com', steps: [{ action: 'html' }] },
// 在真实项目中,这里会从数据库或API读取任务
];
for (const task of tasksToDispatch) {
const taskId = uuidv4();
const payload = { taskId, ...task };
await js.publish(SUBJECTS.TASK_SCRAPE, jsonCodec.encode(payload), {
msgID: taskId, // 使用任务ID作为消息ID,用于去重
});
console.log(`Dispatched task ${taskId} for URL: ${task.url}`);
}
await nc.drain();
}
main().catch(err => console.error(err));
使用msgID
可以在JetStream层面实现消息的幂等性,防止因网络问题导致同一个任务被重复发布。
局限性与未来迭代方向
这个架构解决了核心的可靠性和扩展性问题,但它并非终点。在真实生产环境中,还需要考虑以下几点:
- 反爬策略: 当前的工作节点非常“天真”。现实世界的网站有复杂的反爬虫机制。下一步需要集成代理轮换服务、验证码识别API,并在Playwright层面模拟更真实的用户行为(例如随机延迟、鼠标移动)。
- 资源管理与成本: Playwright(尤其是带界面的浏览器)是资源密集型的。无限制地启动工作节点会导致成本失控。需要将工作节点容器化,部署到Kubernetes上,并利用KEDA (Kubernetes Event-driven Autoscaling) 根据
TASKS
流中的消息积压数量来动态伸缩工作节点的副本数。 - 可观测性: 当前系统缺乏监控。我们需要引入结构化日志,并通过Prometheus暴露关键指标,例如任务处理速率、失败率、NATS流的深度、工作节点的CPU/内存使用情况。集成OpenTelemetry进行分布式追踪,可以清晰地看到一个任务从发布到最终入湖的完整生命周期和耗时。
- 死信队列 (Dead-Letter Queue): 对于那些由于目标网站结构变化或bug导致持续失败的任务,无限重试会浪费大量资源。应为
TASKS
流配置一个死信队列策略,在达到最大重试次数后,将失败的任务消息转移到一个专门的流中,供人工分析和干预。 - 配置与任务复杂性: 当前的任务定义非常简单。需要设计一个更灵活的DSL(领域特定语言)来描述复杂的抓取流程,例如登录、表单提交、分页导航等,并将其作为任务Payload的一部分。