利用 Apache Spark 与 Git LFS 构建基于 SQLite 的原子化 CI 结果数据管道


我们的 monorepo CI 系统正在变成一个性能黑洞。一个看似无害的 PR 能触发数百个独立的构建和测试任务,而定位其中引入的性能衰退,完全依赖工程师的人工排查和直觉。日志散落在各处,缺乏结构,更不用说进行趋势分析了。我们需要一个系统,能将每次 Git提交触发的 CI 运行数据,转化为可查询、可度量的结构化信息。

最初的构想是走传统路线:构建一个 ETL 管道,将所有 CI 日志收集起来,清洗后导入一个中心化的数据仓库,比如 ClickHouse 或 BigQuery。但这套方案对于我们的核心需求——为开发者提供即时反馈——来说太慢、太重了。ETL 的延迟意味着开发者在推送代码后数小时才能看到性能报告。此外,保证单个 CI 运行所有相关数据的原子性加载,在传统 ETL 中也相当复杂。如果一个 CI 运行包含 100 个并行的 job,其中 5 个失败了,我们如何原子地表示这次运行的“部分成功”状态?

这促使我们重新思考问题的本质。我们需要的是一个轻量、原子、可追溯的数据单元。这个单元应该能完整封装一次独立 CI 任务(job)的所有关键产出。

技术选型决策:一个非主流的组合

我们最终选择的架构组合在旁人看来可能有些怪异,但它精确地解决了我们的痛点。

  1. 数据处理核心:Apache Spark
    这几乎是无需争论的选择。CI 日志规模庞大,格式不一,需要强大的分布式处理能力进行解析、聚合和计算。Spark 的灵活性及其在 Scala 和 Python 中的表达力,使其成为处理这种半结构化数据的理想工具。

  2. 数据原子载体:SQLite
    这是整个架构中最具争议也最关键的一环。我们放弃了将处理结果直接写入分布式文件系统(如 HDFS、S3)或数据库。取而代之,我们让每一个 Spark task 将其处理结果写入一个本地的 SQLite 数据库文件。为什么?

    • 原子性 (Atomicity): SQLite 的事务提供了完美的 ACID 保证。一个 Spark task 要么成功生成一个包含完整、一致结果的 .sqlite 文件,要么失败不产出任何东西。这解决了部分成功的问题。一个 CI job 的结果就是一个文件,存在即成功。
    • 可移植性 (Portability): SQLite 文件是单个文件,不依赖任何服务,可以在任何环境中轻松传递、存档和查询。
    • 自描述性 (Self-Describing): 数据库 schema 内置于文件中,数据和结构融为一体,避免了数据与元数据分离导致的管理难题。
  3. 数据版本化与存储:Git & Git LFS
    我们的代码版本管理用 Git,那为什么数据的版本不能也用 Git?我们将 Spark 生成的 .sqlite 文件视为构建产物(artifact),并使用 Git LFS (Large File Storage) 将它们提交到一个专门的数据仓库(Data Repo)。

    • 可追溯性 (Traceability): 数据仓库中的每一次提交都直接对应到代码仓库的一次 CI 运行。通过 Git commit hash,我们可以精确关联代码变更与它产生的数据结果。
    • 不可变性 (Immutability): Git 的历史是不可变的,这为我们的性能数据提供了审计日志。
  4. 前端展示引擎:Turbopack
    为了让开发者能快速消费这些数据,我们需要一个高性能的前端看板。看板需要展示复杂的构建依赖图、性能火焰图和历史趋势。这类数据密集型应用的开发迭代速度至关重要。我们选择了基于 Rust 的 Turbopack 作为构建工具,因为它提供了极致的开发服务器启动速度和热模块替换(HMR)性能,这在调试复杂 D3.js 可视化时节省了大量时间。

架构与数据流

整个流程被设计成一个事件驱动的闭环。

graph TD
    A[Developer pushes to Git Repo] -->|Webhook| B(CI System);
    B -->|Triggers hundreds of jobs| C{CI Jobs};
    C -->|Generate raw logs| D[Log Storage];
    B -->|CI Run Finished Webhook| E(Spark Driver Program);
    E -->|Reads raw logs from D| F[Spark Cluster];
    F -->|Process logs in parallel tasks| G(Spark Tasks);
    subgraph Spark Task Level
        G --> H{Generate local .sqlite file};
        H -->|Use ACID Transactions| I(SQLite Engine);
    end
    G --> J[Collect .sqlite files];
    E -->|After job success| K(Git LFS Commit Script);
    K -->|Commit .sqlite files| L(Data Artifact Git Repo);
    M[Developer Dashboard] -->|API Call| N(Data Query Service);
    N -->|git checkout & query .sqlite| L;
    M -->|Built with Turbopack| O(Developer's Browser);

    style H fill:#f9f,stroke:#333,stroke-width:2px
    style L fill:#ccf,stroke:#333,stroke-width:2px

这个流程的核心在于,我们将大规模的分布式计算(Spark)的产出,固化为一系列小规模、原子化、具备强一致性的本地数据单元(SQLite),并利用版本控制系统(Git)来管理这些数据单元的生命周期。

核心实现:Spark 任务与 SQLite 的共舞

我们的 Spark 应用是使用 Scala 编写的,因为它能更好地控制 JVM 行为且与 Spark 原生 API 结合更紧密。下面是一个被简化的 Spark task 核心处理逻辑,它负责处理单个 CI job 的日志。

// build.sbt
// libraryDependencies ++= Seq(
//   "org.apache.spark" %% "spark-core" % "3.4.1",
//   "org.apache.spark" %% "spark-sql" % "3.4.1",
//   "org.xerial" % "sqlite-jdbc" % "3.42.0.0"
// )

import org.apache.spark.sql.{SparkSession, Row}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.io.File
import java.util.UUID
import scala.util.{Try, Using, Failure, Success}
import org.slf4j.LoggerFactory

object CiLogProcessor {
  private val logger = LoggerFactory.getLogger(this.getClass)

  // 数据结构定义
  case class BuildStep(name: String, durationMs: Long, status: String)
  case class TestResult(suite: String, name: String, durationMs: Long, result: String)
  case class JobMetadata(jobId: String, commitHash: String, startTime: Long)

  def processJobLogs(
      spark: SparkSession,
      jobId: String,
      commitHash: String,
      rawLogPath: String
  ): Try[String] = {
    
    // 为这个任务创建一个唯一的本地 SQLite 文件路径
    val tempDir = System.getProperty("java.io.tmpdir")
    val dbFileName = s"${jobId}_${UUID.randomUUID().toString}.sqlite"
    val dbPath = new File(tempDir, dbFileName).getAbsolutePath
    val dbUrl = s"jdbc:sqlite:$dbPath"
    
    logger.info(s"Processing job '$jobId' for commit '$commitHash'. Outputting to $dbPath")

    // 1. 初始化数据库 Schema
    // 使用 Using 来自动管理资源,确保连接被关闭
    val schemaCreationResult = Using(DriverManager.getConnection(dbUrl)) { conn =>
      createTables(conn)
    }.flatten

    if (schemaCreationResult.isFailure) {
      logger.error(s"Failed to initialize schema for $dbPath", schemaCreationResult.failed.get)
      return Failure(schemaCreationResult.failed.get)
    }

    // 2. 读取原始日志并处理
    // 在真实项目中,这里会有复杂的日志解析逻辑
    val logRDD = spark.sparkContext.textFile(rawLogPath)

    // 假设我们解析出了构建步骤和测试结果
    // 这里用假数据代替复杂的解析过程
    val fakeBuildSteps = Seq(
      BuildStep("checkout", 1500, "SUCCESS"),
      BuildStep("install_deps", 30000, "SUCCESS"),
      BuildStep("compile_code", 120000, "SUCCESS"),
      BuildStep("run_tests", 180000, "FAILURE")
    )
    val fakeTestResults = Seq(
      TestResult("suite1", "test_login", 50, "PASS"),
      TestResult("suite1", "test_logout", 30, "PASS"),
      TestResult("suite2", "test_api_call", 120, "FAIL"),
      TestResult("suite2", "test_validation", 80, "PASS")
    )
    val jobMetadata = JobMetadata(jobId, commitHash, System.currentTimeMillis())

    // 3. 将处理结果写入 SQLite,这是一个原子操作
    val insertResult = Using(DriverManager.getConnection(dbUrl)) { conn =>
      conn.setAutoCommit(false) // 关键:开启事务
      try {
        insertMetadata(conn, jobMetadata)
        insertBuildSteps(conn, fakeBuildSteps)
        insertTestResults(conn, fakeTestResults)
        conn.commit() // 关键:提交事务
        logger.info(s"Successfully committed data for job '$jobId' to $dbPath")
      } catch {
        case e: Exception =>
          logger.error(s"Exception during data insertion for job '$jobId', rolling back transaction.", e)
          conn.rollback() // 关键:发生错误则回滚
          throw e // 重新抛出异常,让 Spark 知道任务失败
      }
    }

    insertResult match {
      case Success(_) => Success(dbPath) // 成功,返回数据库文件路径
      case Failure(e) =>
        // 插入失败,清理掉可能产生的垃圾文件
        new File(dbPath).delete()
        Failure(e)
    }
  }

  private def createTables(conn: Connection): Try[Unit] = Try {
    val statement = conn.createStatement()
    statement.executeUpdate("""
      CREATE TABLE IF NOT EXISTS metadata (
        job_id TEXT PRIMARY KEY,
        commit_hash TEXT NOT NULL,
        start_time INTEGER NOT NULL
      );
    """)
    statement.executeUpdate("""
      CREATE TABLE IF NOT EXISTS build_steps (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        duration_ms INTEGER NOT NULL,
        status TEXT NOT NULL
      );
    """)
    statement.executeUpdate("""
      CREATE TABLE IF NOT EXISTS test_results (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        suite TEXT NOT NULL,
        name TEXT NOT NULL,
        duration_ms INTEGER NOT NULL,
        result TEXT NOT NULL
      );
    """)
    statement.close()
  }

  // 下面是具体的插入逻辑,使用 PreparedStatement 防止 SQL 注入
  private def insertMetadata(conn: Connection, data: JobMetadata): Unit = {
    val sql = "INSERT INTO metadata (job_id, commit_hash, start_time) VALUES (?, ?, ?);"
    Using.resource(conn.prepareStatement(sql)) { stmt =>
      stmt.setString(1, data.jobId)
      stmt.setString(2, data.commitHash)
      stmt.setLong(3, data.startTime)
      stmt.executeUpdate()
    }
  }

  private def insertBuildSteps(conn: Connection, steps: Seq[BuildStep]): Unit = {
    val sql = "INSERT INTO build_steps (name, duration_ms, status) VALUES (?, ?, ?);"
    Using.resource(conn.prepareStatement(sql)) { stmt =>
      for (step <- steps) {
        stmt.setString(1, step.name)
        stmt.setLong(2, step.durationMs)
        stmt.setString(3, step.status)
        stmt.addBatch()
      }
      stmt.executeBatch()
    }
  }

  private def insertTestResults(conn: Connection, results: Seq[TestResult]): Unit = {
    val sql = "INSERT INTO test_results (suite, name, duration_ms, result) VALUES (?, ?, ?, ?);"
    Using.resource(conn.prepareStatement(sql)) { stmt =>
      for (res <- results) {
        stmt.setString(1, res.suite)
        stmt.setString(2, res.name)
        stmt.setLong(3, res.durationMs)
        stmt.setString(4, res.result)
        stmt.addBatch()
      }
      stmt.executeBatch()
    }
  }
}

这段代码的核心思想是:每个 Spark task 都是一个独立的事务单元。它使用 JDBC 连接到一个本地 SQLite 文件,在事务中执行所有数据库写入操作。如果任何一步出错,整个事务会回滚,并且上层逻辑会捕获异常,最终导致 Spark task 失败。只有当所有数据都成功写入并commit后,这个 .sqlite 文件才被视为一个有效的、完整的产物。

产物管理:Git LFS 的自动化脚本

当 Spark 作业成功完成后,Driver 节点会收集所有 Executor 节点生成的 .sqlite 文件路径。接着,一个 post-processing 脚本会被触发,负责将这些文件提交到我们的数据仓库。

#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.

# 这些变量由 Spark Driver 作业传入
COMMIT_HASH="$1"
SPARK_OUTPUT_PATHS_FILE="$2" # 一个包含所有 .sqlite 文件路径的文本文件
DATA_REPO_PATH="/path/to/local/clone/of/data-repo"

if [ -z "$COMMIT_HASH" ] || [ -z "$SPARK_OUTPUT_PATHS_FILE" ]; then
  echo "Usage: $0 <commit_hash> <spark_output_paths_file>"
  exit 1
fi

cd "$DATA_REPO_PATH"

# 1. 确保数据仓库是最新的
git pull origin main --rebase

# 2. 创建一个以 commit hash 命名的子目录,用于存放本次运行的所有产物
TARGET_DIR="${DATA_REPO_PATH}/${COMMIT_HASH}"
mkdir -p "$TARGET_DIR"

# 3. 从路径文件中读取并移动所有 .sqlite 文件
while IFS= read -r sqlite_path; do
  if [ -f "$sqlite_path" ]; then
    echo "Moving $sqlite_path to $TARGET_DIR/"
    mv "$sqlite_path" "$TARGET_DIR/"
  else
    echo "Warning: File not found: $sqlite_path"
  fi
done < "$SPARK_OUTPUT_PATHS_FILE"

# 4. 使用 Git LFS 来追踪 .sqlite 文件
# 这个配置只需要在仓库初始化时做一次
# git lfs track "*.sqlite"
# git add .gitattributes

# 5. 将新产物添加到 Git
git add "${COMMIT_HASH}/"

# 6. 提交并推送
# 这里的 git user 需要配置为机器人账户
git commit -m "Add CI artifacts for commit ${COMMIT_HASH}"
git push origin main

echo "Successfully committed CI artifacts for ${COMMIT_HASH} to data repo."

这个脚本的逻辑很直白:为每次 CI 运行(由 COMMIT_HASH 标识)创建一个目录,将所有相关的 SQLite 文件移入,然后使用 git addgit commit 将它们纳入版本控制。由于我们预先配置了 Git LFS 追踪 .sqlite 文件,这些大文件实际上会被上传到 LFS 服务器,而 Git 仓库中只保留轻量的指针文件。

前端消费:Turbopack 赋能的快速迭代

数据管道的最后一环是展示。我们的开发者看板是一个 React 应用,需要处理和可视化大量数据。

一个典型的场景是展示某个 CI job 的构建步骤耗时分布饼图。

// package.json (scripts section)
"scripts": {
  "dev": "turbo dev", // 使用 Turbopack 启动开发服务器
  "build": "turbo build",
  "start": "turbo start"
},

使用 Turbopack 的好处在开发阶段体现得淋漓尽致。当我们调整 D3.js 的可视化逻辑或数据转换函数时,页面几乎是瞬时更新,无需等待漫长的重新打包。这对于需要频繁微调视觉效果的数据可视化开发来说,是颠覆性的体验提升。

// Simplified React component to fetch and display build steps

import React, { useState, useEffect } from 'react';
// 假设使用 chart.js 或 D3 来绘图
import { Pie } from 'react-chartjs-2'; 

function BuildStepChart({ commitHash, jobId }) {
  const [chartData, setChartData] = useState(null);
  const [error, setError] = useState(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    const fetchData = async () => {
      setLoading(true);
      try {
        // API 服务会处理 git checkout 和 sqlite query 的逻辑
        const response = await fetch(`/api/artifacts/${commitHash}/${jobId}/build_steps`);
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
        const data = await response.json(); // API 返回 [{name, duration_ms, status}]
        
        // 转换数据为图表库需要的格式
        setChartData({
          labels: data.map(d => d.name),
          datasets: [
            {
              label: 'Build Step Duration (ms)',
              data: data.map(d => d.duration_ms),
              backgroundColor: [
                '#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF'
              ],
            },
          ],
        });
      } catch (e) {
        setError(e.message);
      } finally {
        setLoading(false);
      }
    };

    fetchData();
  }, [commitHash, jobId]);

  if (loading) return <div>Loading chart...</div>;
  if (error) return <div>Error: {error}</div>;
  if (!chartData) return <div>No data available.</div>;

  return (
    <div>
      <h3>Build Steps for {jobId}</h3>
      <Pie data={chartData} />
    </div>
  );
}

这个组件背后是一个简单的 API 服务,它接收 commitHashjobId,然后执行 git checkout 操作检出对应的数据文件,连接 SQLite 数据库,执行 SELECT * FROM build_steps;,并将结果序列化为 JSON 返回。

局限性与未来展望

这套架构并非没有缺点。一个显而易见的局限是跨 CI 运行的聚合查询性能。虽然查询单个 SQLite 文件非常快,但如果要分析“过去一个月所有失败测试的 Top 10”,就需要检出成千上万个 Git LFS 文件并逐个查询,效率极低。

为了解决这个问题,我们正在实施一个二级异步流程:每天晚上,一个独立的 Spark 作业会扫描当天新生成的 SQLite 产物,将它们的数据提取、合并,然后批量导入到一个真正的列式分析数据库(我们选择了 ClickHouse)中。这为我们提供了两全其美的方案:为开发者提供即时的、原子化的单次运行反馈,同时为平台维护者和数据分析师提供高效的、大规模的历史趋势分析能力。

另一个潜在问题是 Git LFS 仓库的膨胀。我们必须制定严格的数据保留策略和归档机制,例如,只保留近 3 个月的热数据在主 LFS 服务器上,更早的数据则归档到成本更低的冷存储中。

尽管存在这些挑战,但这套结合了 Spark、SQLite 和 Git 的非典型数据管道,在生产环境中被证明是极其有效的。它为我们混乱的 CI 流程带来了秩序、原子性和可追溯性,最终显著提升了开发团队定位和解决性能问题的效率。


  目录