构建基于Playwright、NATS JetStream与数据湖的弹性非结构化数据采集总线


获取高质量、结构化的API数据是一回事,但现实世界中,大量关键信息被锁定在动态渲染、需要复杂用户交互才能访问的Web应用里。单纯依赖API,我们丢失了用户视角下的真实页面呈现、第三方脚本行为以及通过交互才能触发的数据。最初我们尝试构建一个简单的爬虫集群,用一个中央调度器通过HTTP直接分发任务给Playwright工作节点。这种架构的脆弱性在第一次遇到网络抖动时就暴露无遗:任务丢失、状态不一致、节点故障导致数据采集黑洞,整个系统耦合度极高,难以维护和扩展。

问题的核心在于缺乏一个可靠的、解耦的通信与任务持久化层。我们需要一个总线,它不仅能分发任务,还能保证任务不丢失,即使工作节点崩溃;它不仅能接收结果,还能缓冲数据,应对下游数据湖的暂时不可用。这就是我们将目光投向NATS JetStream的原因。它提供的持久化流、消费者确认机制以及重投递策略,恰好是解决我们痛点的关键。

我们的目标是构建一个弹性的非结构化数据采集总线。在这个架构中,Playwright工作节点是纯粹的执行单元,它们从一个持久化的任务流中拉取任务,执行浏览器自动化操作,然后将结果(包括HTML快照、截图、HAR文件等)推送到另一个结果流中。一个独立的摄入服务消费结果流,并将这些非结构化数据归档到S3兼容的数据湖(我们使用MinIO)中。整个系统通过NATS JetStream进行解耦,实现了水平扩展和故障容错。

架构设计与核心组件

整个系统的生命周期围绕着消息的流动。它不是一个请求-响应模型,而是一个纯粹的事件驱动架构。

graph TD
    subgraph "任务发布方 (Dispatcher)"
        A[Dispatcher App] -- "发布采集任务 (JSON)" --> B(NATS JetStream);
    end

    subgraph "NATS JetStream (消息总线)"
        B -- "tasks.scrape" --> C{Stream: TASKS};
        D{Stream: RESULTS} -- "results.archive" --> E(NATS JetStream);
    end

    subgraph "弹性工作节点池 (Playwright Workers)"
        F[Worker 1] -->|拉取任务| C;
        G[Worker 2] -->|拉取任务| C;
        H[Worker N] -->|拉取任务| C;

        F -- "发布采集结果" --> E;
        G -- "发布采集结果" --> E;
        H -- "发布采集结果" --> E;
    end

    subgraph "数据湖摄入服务 (Ingestion Service)"
        I[Ingester Service] -->|消费结果| D;
        I -- "写入原始数据" --> J[(Data Lake / MinIO)];
    end

    style F fill:#d4fcd7,stroke:#333,stroke-width:2px
    style G fill:#d4fcd7,stroke:#333,stroke-width:2px
    style H fill:#d4fcd7,stroke:#333,stroke-width:2px
    style I fill:#fcc0d7,stroke:#333,stroke-width:2px
  1. NATS JetStream: 系统的核心。我们定义了两个流(Stream):

    • TASKS: 用于存放待处理的抓取任务。任务发布后会一直保留,直到被工作节点成功处理并确认。我们使用 WorkQueue 保留策略,确保消息在被确认前只会被一个消费者处理。
    • RESULTS: 用于存放Playwright工作节点的产出。下游的摄入服务会消费这些结果。我们使用 Limits 保留策略,可以根据消息数量或年龄进行清理,防止结果无限堆积。
  2. Playwright Worker: 无状态的Node.js/TypeScript应用。它们是纯粹的消费者和生产者。启动后,连接到NATS,从TASKS流中拉取任务,执行Playwright脚本,然后将结果(成功或失败)发布到RESULTS流。关键点在于,只有当结果成功发布到RESULTS流后,它才会向NATS发送ack()确认,将任务从TASKS流中移除。如果工作节点在处理过程中崩溃,NATS会因为长时间未收到ack而将任务重新分发给另一个健康的工作节点。

  3. Ingestion Service: 另一个独立的微服务,它的唯一职责是从RESULTS流中消费数据,并将其持久化到MinIO。这种分离确保了数据采集和数据存储的关注点分离。即使MinIO暂时不可用,数据也会安全地保留在NATS JetStream的RESULTS流中,待服务恢复后继续处理。

生产级代码实现

让我们深入代码。这是一个基于TypeScript的Monorepo项目结构,包含worker, ingesterdispatcher三个包。

1. NATS JetStream 初始化与配置

在生产环境中,流的定义应该通过代码或运维脚本进行幂等管理。

common/nats-setup.ts

import { connect, JetStreamManager, NatsConnection, StreamConfig } from 'nats';

const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";

export const STREAMS = {
  TASKS: "TASKS",
  RESULTS: "RESULTS",
};

export const SUBJECTS = {
  TASK_SCRAPE: "tasks.scrape",
  RESULT_ARCHIVE: "results.archive",
};

async function ensureStream(jsm: JetStreamManager, config: Partial<StreamConfig>) {
  try {
    const stream = await jsm.streams.info(config.name!);
    // 在真实项目中,这里可能需要对比并更新配置
    console.log(`Stream ${config.name} already exists.`);
  } catch (err: any) {
    if (err.code === '404') {
      await jsm.streams.add(config);
      console.log(`Stream ${config.name} created.`);
    } else {
      throw err;
    }
  }
}

export async function setupNatsStreams(): Promise<NatsConnection> {
  const nc = await connect({ servers: NATS_URL });
  const jsm = await nc.jetstreamManager();

  // 任务流: 确保任务至少被处理一次
  await ensureStream(jsm, {
    name: STREAMS.TASKS,
    subjects: [`${SUBJECTS.TASK_SCRAPE}`],
    storage: 'file', // 持久化到磁盘
    retention: 'workqueue', // 消息被消费确认后删除
    ack_policy: 'explicit', // 需要显式确认
    max_age: 24 * 60 * 60 * 1_000_000_000, // 任务有效期24小时 (单位: ns)
    num_replicas: 1, // 在单节点测试环境中为1
  });

  // 结果流: 存储采集结果,允许下游服务宕机一段时间
  await ensureStream(jsm, {
    name: STREAMS.RESULTS,
    subjects: [`${SUBJECTS.RESULT_ARCHIVE}`],
    storage: 'file',
    retention: 'limits',
    max_bytes: 10 * 1024 * 1024 * 1024, // 最大10GB
    max_age: 7 * 24 * 60 * 60 * 1_000_000_000, // 最长保留7天
    num_replicas: 1,
  });

  console.log("NATS JetStream streams configured.");
  return nc;
}

这段代码的核心是ensureStream,它保证了我们的流配置是声明式的。ack_policy: 'explicit'是实现可靠性的基石。

2. 弹性的Playwright工作节点

工作节点是系统的“肌肉”,它必须健壮。

worker/src/worker.ts

import { chromium, Browser, Page } from 'playwright';
import { connect, JetStreamClient, JSONCodec, NatsConnection } from 'nats';
import { v4 as uuidv4 } from 'uuid';
import { STREAMS, SUBJECTS } from '../../common/nats-setup';

// 定义任务和结果的数据结构
interface ScrapeTask {
  taskId: string;
  url: string;
  steps: { action: 'goto' | 'screenshot' | 'html'; selector?: string }[];
}

interface ScrapeResult {
  taskId: string;
  workerId: string;
  url: string;
  success: boolean;
  data?: { type: 'html' | 'screenshot'; content: string; key: string }; // content for html, key for screenshot
  error?: string;
  timestamp: string;
}

const WORKER_ID = `worker-${uuidv4()}`;
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const jsonCodec = JSONCodec();

class ScraperWorker {
  private nc!: NatsConnection;
  private js!: JetStreamClient;
  private browser!: Browser;

  async start() {
    console.log(`[${WORKER_ID}] Starting...`);
    this.nc = await connect({ servers: NATS_URL, name: WORKER_ID });
    this.js = this.nc.jetstream();
    this.browser = await chromium.launch({ headless: true });

    // 创建一个持久化的消费者
    const consumer = await this.js.consumers.get(STREAMS.TASKS, {
        durable_name: 'scraping-worker-pool', // 所有worker共享同一个durable name,形成竞争消费
        ack_policy: 'explicit',
        max_ack_pending: 5, // 同时处理最多5个任务
    });

    console.log(`[${WORKER_ID}] Ready and waiting for tasks...`);
    // 拉取消息,而不是推送。这提供了更好的流控。
    const messages = await consumer.consume();
    for await (const msg of messages) {
        const task = jsonCodec.decode(msg.data) as ScrapeTask;
        console.log(`[${WORKER_ID}] Received task ${task.taskId} for URL: ${task.url}`);

        let result: ScrapeResult;
        try {
            const page = await this.browser.newPage();
            // 在生产中,这里的超时和重试逻辑需要更复杂
            await page.goto(task.url, { waitUntil: 'domcontentloaded', timeout: 30000 });
            
            // 简单示例:获取HTML
            const htmlContent = await page.content();
            const resultKey = `${new Date().toISOString()}-${task.taskId}.html`;

            await page.close();

            result = {
                taskId: task.taskId,
                workerId: WORKER_ID,
                url: task.url,
                success: true,
                data: { type: 'html', content: htmlContent, key: resultKey },
                timestamp: new Date().toISOString(),
            };

        } catch (error: any) {
            console.error(`[${WORKER_ID}] Failed task ${task.taskId}: ${error.message}`);
            result = {
                taskId: task.taskId,
                workerId: WORKER_ID,
                url: task.url,
                success: false,
                error: error.message,
                timestamp: new Date().toISOString(),
            };
        }

        // 关键步骤:先发布结果,再确认任务
        try {
            await this.js.publish(SUBJECTS.RESULT_ARCHIVE, jsonCodec.encode(result));
            msg.ack(); // 任务处理完成,从TASKS流中移除
            console.log(`[${WORKER_ID}] Task ${task.taskId} processed and acknowledged.`);
        } catch (pubError) {
            console.error(`[${WORKER_ID}] CRITICAL: Failed to publish result for task ${task.taskId}. Task will be redelivered.`, pubError);
            // 这里不ack,让NATS超时后重投递任务
        }
    }
  }

  async stop() {
    console.log(`[${WORKER_ID}] Shutting down...`);
    await this.browser.close();
    await this.nc.close();
  }
}

const worker = new ScraperWorker();
worker.start();

process.on('SIGINT', () => worker.stop());
process.on('SIGTERM', () => worker.stop());

这里的 durable_name: 'scraping-worker-pool' 是实现水平扩展的关键。所有使用相同 durable_name 的消费者构成一个消费组,NATS JetStream会确保一个消息只被组内的一个消费者处理。如果一个worker崩溃,它正在处理(但未ack)的消息将在一个可配置的超时后(ack_wait)被重新投递给消费组里的其他worker。

3. 数据湖摄入服务

该服务负责将采集到的原始数据安全地存入MinIO。

ingester/src/ingester.ts

import { Client as MinioClient } from 'minio';
import { connect, JetStreamClient, JSONCodec, NatsConnection } from 'nats';
import { STREAMS, SUBJECTS } from '../../common/nats-setup';

const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT || 'localhost';
const MINIO_PORT = parseInt(process.env.MINIO_PORT || '9000', 10);
const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY || 'minioadmin';
const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY || 'minioadmin';
const BUCKET_NAME = 'raw-web-data';

const jsonCodec = JSONCodec();

// 定义接收的数据结构
interface ScrapeResult {
  taskId: string;
  url: string;
  success: boolean;
  data?: { type: 'html' | 'screenshot'; content: string; key: string };
  error?: string;
}

class IngesterService {
  private nc!: NatsConnection;
  private js!: JetStreamClient;
  private minioClient: MinioClient;

  constructor() {
    this.minioClient = new MinioClient({
      endPoint: MINIO_ENDPOINT,
      port: MINIO_PORT,
      useSSL: false,
      accessKey: MINIO_ACCESS_KEY,
      secretKey: MINIO_SECRET_KEY,
    });
  }

  async start() {
    console.log('[Ingester] Starting...');
    this.nc = await connect({ servers: NATS_URL, name: "ingester-service" });
    this.js = this.nc.jetstream();

    // 确保bucket存在
    const bucketExists = await this.minioClient.bucketExists(BUCKET_NAME);
    if (!bucketExists) {
        console.log(`[Ingester] Bucket ${BUCKET_NAME} does not exist. Creating...`);
        await this.minioClient.makeBucket(BUCKET_NAME, 'us-east-1');
    }

    const consumer = await this.js.consumers.get(STREAMS.RESULTS, {
        durable_name: 'data-lake-ingester',
        ack_policy: 'explicit',
    });
    
    console.log('[Ingester] Ready and waiting for results...');
    const messages = await consumer.consume();
    for await (const msg of messages) {
        const result = jsonCodec.decode(msg.data) as ScrapeResult;
        console.log(`[Ingester] Received result for task ${result.taskId}`);

        if (result.success && result.data) {
            try {
                // 根据日期和任务ID组织存储路径
                const date = new Date();
                const path = `${date.getFullYear()}/${(date.getMonth() + 1).toString().padStart(2, '0')}/${date.getDate().toString().padStart(2, '0')}/${result.taskId}/${result.data.key}`;

                await this.minioClient.putObject(
                    BUCKET_NAME,
                    path,
                    Buffer.from(result.data.content), // 在真实场景中,截图等二进制文件需要特殊处理
                    { 'Content-Type': 'text/html' }
                );

                console.log(`[Ingester] Stored result for ${result.taskId} at ${path}`);
                msg.ack();

            } catch (storageError) {
                console.error(`[Ingester] CRITICAL: Failed to store result for ${result.taskId} in MinIO. Will not ACK.`, storageError);
                // 存储失败,不ack,消息将重投
            }
        } else {
            // 处理失败的任务,可以记录到日志或另一个NATS流
            console.warn(`[Ingester] Skipping failed task ${result.taskId}: ${result.error}`);
            msg.ack(); // 即使是失败的结果,我们也消费掉,避免无限重试
        }
    }
  }

  async stop() {
    console.log('[Ingester] Shutting down...');
    await this.nc.close();
  }
}

const ingester = new IngesterService();
ingester.start();

process.on('SIGINT', () => ingester.stop());
process.on('SIGTERM', () => ingester.stop());

4. 任务发布器

一个简单的脚本,用于向系统中注入任务。

dispatcher/src/dispatcher.ts

import { connect, JSONCodec } from 'nats';
import { v4 as uuidv4 } from 'uuid';
import { SUBJECTS } from '../../common/nats-setup';

const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const jsonCodec = JSONCodec();

async function main() {
    const nc = await connect({ servers: NATS_URL });
    const js = nc.jetstream();

    const tasksToDispatch = [
        { url: 'https://www.google.com', steps: [{ action: 'html' }] },
        { url: 'https://github.com', steps: [{ action: 'html' }] },
        // 在真实项目中,这里会从数据库或API读取任务
    ];

    for (const task of tasksToDispatch) {
        const taskId = uuidv4();
        const payload = { taskId, ...task };
        
        await js.publish(SUBJECTS.TASK_SCRAPE, jsonCodec.encode(payload), {
            msgID: taskId, // 使用任务ID作为消息ID,用于去重
        });
        console.log(`Dispatched task ${taskId} for URL: ${task.url}`);
    }

    await nc.drain();
}

main().catch(err => console.error(err));

使用msgID可以在JetStream层面实现消息的幂等性,防止因网络问题导致同一个任务被重复发布。

局限性与未来迭代方向

这个架构解决了核心的可靠性和扩展性问题,但它并非终点。在真实生产环境中,还需要考虑以下几点:

  1. 反爬策略: 当前的工作节点非常“天真”。现实世界的网站有复杂的反爬虫机制。下一步需要集成代理轮换服务、验证码识别API,并在Playwright层面模拟更真实的用户行为(例如随机延迟、鼠标移动)。
  2. 资源管理与成本: Playwright(尤其是带界面的浏览器)是资源密集型的。无限制地启动工作节点会导致成本失控。需要将工作节点容器化,部署到Kubernetes上,并利用KEDA (Kubernetes Event-driven Autoscaling) 根据TASKS流中的消息积压数量来动态伸缩工作节点的副本数。
  3. 可观测性: 当前系统缺乏监控。我们需要引入结构化日志,并通过Prometheus暴露关键指标,例如任务处理速率、失败率、NATS流的深度、工作节点的CPU/内存使用情况。集成OpenTelemetry进行分布式追踪,可以清晰地看到一个任务从发布到最终入湖的完整生命周期和耗时。
  4. 死信队列 (Dead-Letter Queue): 对于那些由于目标网站结构变化或bug导致持续失败的任务,无限重试会浪费大量资源。应为TASKS流配置一个死信队列策略,在达到最大重试次数后,将失败的任务消息转移到一个专门的流中,供人工分析和干预。
  5. 配置与任务复杂性: 当前的任务定义非常简单。需要设计一个更灵活的DSL(领域特定语言)来描述复杂的抓取流程,例如登录、表单提交、分页导航等,并将其作为任务Payload的一部分。

  目录