我们的 monorepo CI 系统正在变成一个性能黑洞。一个看似无害的 PR 能触发数百个独立的构建和测试任务,而定位其中引入的性能衰退,完全依赖工程师的人工排查和直觉。日志散落在各处,缺乏结构,更不用说进行趋势分析了。我们需要一个系统,能将每次 Git提交触发的 CI 运行数据,转化为可查询、可度量的结构化信息。
最初的构想是走传统路线:构建一个 ETL 管道,将所有 CI 日志收集起来,清洗后导入一个中心化的数据仓库,比如 ClickHouse 或 BigQuery。但这套方案对于我们的核心需求——为开发者提供即时反馈——来说太慢、太重了。ETL 的延迟意味着开发者在推送代码后数小时才能看到性能报告。此外,保证单个 CI 运行所有相关数据的原子性加载,在传统 ETL 中也相当复杂。如果一个 CI 运行包含 100 个并行的 job,其中 5 个失败了,我们如何原子地表示这次运行的“部分成功”状态?
这促使我们重新思考问题的本质。我们需要的是一个轻量、原子、可追溯的数据单元。这个单元应该能完整封装一次独立 CI 任务(job)的所有关键产出。
技术选型决策:一个非主流的组合
我们最终选择的架构组合在旁人看来可能有些怪异,但它精确地解决了我们的痛点。
数据处理核心:Apache Spark
这几乎是无需争论的选择。CI 日志规模庞大,格式不一,需要强大的分布式处理能力进行解析、聚合和计算。Spark 的灵活性及其在 Scala 和 Python 中的表达力,使其成为处理这种半结构化数据的理想工具。数据原子载体:SQLite
这是整个架构中最具争议也最关键的一环。我们放弃了将处理结果直接写入分布式文件系统(如 HDFS、S3)或数据库。取而代之,我们让每一个 Spark task 将其处理结果写入一个本地的 SQLite 数据库文件。为什么?- 原子性 (Atomicity): SQLite 的事务提供了完美的
ACID
保证。一个 Spark task 要么成功生成一个包含完整、一致结果的.sqlite
文件,要么失败不产出任何东西。这解决了部分成功的问题。一个 CI job 的结果就是一个文件,存在即成功。 - 可移植性 (Portability): SQLite 文件是单个文件,不依赖任何服务,可以在任何环境中轻松传递、存档和查询。
- 自描述性 (Self-Describing): 数据库 schema 内置于文件中,数据和结构融为一体,避免了数据与元数据分离导致的管理难题。
- 原子性 (Atomicity): SQLite 的事务提供了完美的
数据版本化与存储:Git & Git LFS
我们的代码版本管理用 Git,那为什么数据的版本不能也用 Git?我们将 Spark 生成的.sqlite
文件视为构建产物(artifact),并使用 Git LFS (Large File Storage) 将它们提交到一个专门的数据仓库(Data Repo)。- 可追溯性 (Traceability): 数据仓库中的每一次提交都直接对应到代码仓库的一次 CI 运行。通过 Git commit hash,我们可以精确关联代码变更与它产生的数据结果。
- 不可变性 (Immutability): Git 的历史是不可变的,这为我们的性能数据提供了审计日志。
前端展示引擎:Turbopack
为了让开发者能快速消费这些数据,我们需要一个高性能的前端看板。看板需要展示复杂的构建依赖图、性能火焰图和历史趋势。这类数据密集型应用的开发迭代速度至关重要。我们选择了基于 Rust 的 Turbopack 作为构建工具,因为它提供了极致的开发服务器启动速度和热模块替换(HMR)性能,这在调试复杂 D3.js 可视化时节省了大量时间。
架构与数据流
整个流程被设计成一个事件驱动的闭环。
graph TD A[Developer pushes to Git Repo] -->|Webhook| B(CI System); B -->|Triggers hundreds of jobs| C{CI Jobs}; C -->|Generate raw logs| D[Log Storage]; B -->|CI Run Finished Webhook| E(Spark Driver Program); E -->|Reads raw logs from D| F[Spark Cluster]; F -->|Process logs in parallel tasks| G(Spark Tasks); subgraph Spark Task Level G --> H{Generate local .sqlite file}; H -->|Use ACID Transactions| I(SQLite Engine); end G --> J[Collect .sqlite files]; E -->|After job success| K(Git LFS Commit Script); K -->|Commit .sqlite files| L(Data Artifact Git Repo); M[Developer Dashboard] -->|API Call| N(Data Query Service); N -->|git checkout & query .sqlite| L; M -->|Built with Turbopack| O(Developer's Browser); style H fill:#f9f,stroke:#333,stroke-width:2px style L fill:#ccf,stroke:#333,stroke-width:2px
这个流程的核心在于,我们将大规模的分布式计算(Spark)的产出,固化为一系列小规模、原子化、具备强一致性的本地数据单元(SQLite),并利用版本控制系统(Git)来管理这些数据单元的生命周期。
核心实现:Spark 任务与 SQLite 的共舞
我们的 Spark 应用是使用 Scala 编写的,因为它能更好地控制 JVM 行为且与 Spark 原生 API 结合更紧密。下面是一个被简化的 Spark task 核心处理逻辑,它负责处理单个 CI job 的日志。
// build.sbt
// libraryDependencies ++= Seq(
// "org.apache.spark" %% "spark-core" % "3.4.1",
// "org.apache.spark" %% "spark-sql" % "3.4.1",
// "org.xerial" % "sqlite-jdbc" % "3.42.0.0"
// )
import org.apache.spark.sql.{SparkSession, Row}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.io.File
import java.util.UUID
import scala.util.{Try, Using, Failure, Success}
import org.slf4j.LoggerFactory
object CiLogProcessor {
private val logger = LoggerFactory.getLogger(this.getClass)
// 数据结构定义
case class BuildStep(name: String, durationMs: Long, status: String)
case class TestResult(suite: String, name: String, durationMs: Long, result: String)
case class JobMetadata(jobId: String, commitHash: String, startTime: Long)
def processJobLogs(
spark: SparkSession,
jobId: String,
commitHash: String,
rawLogPath: String
): Try[String] = {
// 为这个任务创建一个唯一的本地 SQLite 文件路径
val tempDir = System.getProperty("java.io.tmpdir")
val dbFileName = s"${jobId}_${UUID.randomUUID().toString}.sqlite"
val dbPath = new File(tempDir, dbFileName).getAbsolutePath
val dbUrl = s"jdbc:sqlite:$dbPath"
logger.info(s"Processing job '$jobId' for commit '$commitHash'. Outputting to $dbPath")
// 1. 初始化数据库 Schema
// 使用 Using 来自动管理资源,确保连接被关闭
val schemaCreationResult = Using(DriverManager.getConnection(dbUrl)) { conn =>
createTables(conn)
}.flatten
if (schemaCreationResult.isFailure) {
logger.error(s"Failed to initialize schema for $dbPath", schemaCreationResult.failed.get)
return Failure(schemaCreationResult.failed.get)
}
// 2. 读取原始日志并处理
// 在真实项目中,这里会有复杂的日志解析逻辑
val logRDD = spark.sparkContext.textFile(rawLogPath)
// 假设我们解析出了构建步骤和测试结果
// 这里用假数据代替复杂的解析过程
val fakeBuildSteps = Seq(
BuildStep("checkout", 1500, "SUCCESS"),
BuildStep("install_deps", 30000, "SUCCESS"),
BuildStep("compile_code", 120000, "SUCCESS"),
BuildStep("run_tests", 180000, "FAILURE")
)
val fakeTestResults = Seq(
TestResult("suite1", "test_login", 50, "PASS"),
TestResult("suite1", "test_logout", 30, "PASS"),
TestResult("suite2", "test_api_call", 120, "FAIL"),
TestResult("suite2", "test_validation", 80, "PASS")
)
val jobMetadata = JobMetadata(jobId, commitHash, System.currentTimeMillis())
// 3. 将处理结果写入 SQLite,这是一个原子操作
val insertResult = Using(DriverManager.getConnection(dbUrl)) { conn =>
conn.setAutoCommit(false) // 关键:开启事务
try {
insertMetadata(conn, jobMetadata)
insertBuildSteps(conn, fakeBuildSteps)
insertTestResults(conn, fakeTestResults)
conn.commit() // 关键:提交事务
logger.info(s"Successfully committed data for job '$jobId' to $dbPath")
} catch {
case e: Exception =>
logger.error(s"Exception during data insertion for job '$jobId', rolling back transaction.", e)
conn.rollback() // 关键:发生错误则回滚
throw e // 重新抛出异常,让 Spark 知道任务失败
}
}
insertResult match {
case Success(_) => Success(dbPath) // 成功,返回数据库文件路径
case Failure(e) =>
// 插入失败,清理掉可能产生的垃圾文件
new File(dbPath).delete()
Failure(e)
}
}
private def createTables(conn: Connection): Try[Unit] = Try {
val statement = conn.createStatement()
statement.executeUpdate("""
CREATE TABLE IF NOT EXISTS metadata (
job_id TEXT PRIMARY KEY,
commit_hash TEXT NOT NULL,
start_time INTEGER NOT NULL
);
""")
statement.executeUpdate("""
CREATE TABLE IF NOT EXISTS build_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
duration_ms INTEGER NOT NULL,
status TEXT NOT NULL
);
""")
statement.executeUpdate("""
CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
suite TEXT NOT NULL,
name TEXT NOT NULL,
duration_ms INTEGER NOT NULL,
result TEXT NOT NULL
);
""")
statement.close()
}
// 下面是具体的插入逻辑,使用 PreparedStatement 防止 SQL 注入
private def insertMetadata(conn: Connection, data: JobMetadata): Unit = {
val sql = "INSERT INTO metadata (job_id, commit_hash, start_time) VALUES (?, ?, ?);"
Using.resource(conn.prepareStatement(sql)) { stmt =>
stmt.setString(1, data.jobId)
stmt.setString(2, data.commitHash)
stmt.setLong(3, data.startTime)
stmt.executeUpdate()
}
}
private def insertBuildSteps(conn: Connection, steps: Seq[BuildStep]): Unit = {
val sql = "INSERT INTO build_steps (name, duration_ms, status) VALUES (?, ?, ?);"
Using.resource(conn.prepareStatement(sql)) { stmt =>
for (step <- steps) {
stmt.setString(1, step.name)
stmt.setLong(2, step.durationMs)
stmt.setString(3, step.status)
stmt.addBatch()
}
stmt.executeBatch()
}
}
private def insertTestResults(conn: Connection, results: Seq[TestResult]): Unit = {
val sql = "INSERT INTO test_results (suite, name, duration_ms, result) VALUES (?, ?, ?, ?);"
Using.resource(conn.prepareStatement(sql)) { stmt =>
for (res <- results) {
stmt.setString(1, res.suite)
stmt.setString(2, res.name)
stmt.setLong(3, res.durationMs)
stmt.setString(4, res.result)
stmt.addBatch()
}
stmt.executeBatch()
}
}
}
这段代码的核心思想是:每个 Spark task 都是一个独立的事务单元。它使用 JDBC 连接到一个本地 SQLite 文件,在事务中执行所有数据库写入操作。如果任何一步出错,整个事务会回滚,并且上层逻辑会捕获异常,最终导致 Spark task 失败。只有当所有数据都成功写入并commit
后,这个 .sqlite
文件才被视为一个有效的、完整的产物。
产物管理:Git LFS 的自动化脚本
当 Spark 作业成功完成后,Driver 节点会收集所有 Executor 节点生成的 .sqlite
文件路径。接着,一个 post-processing 脚本会被触发,负责将这些文件提交到我们的数据仓库。
#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.
# 这些变量由 Spark Driver 作业传入
COMMIT_HASH="$1"
SPARK_OUTPUT_PATHS_FILE="$2" # 一个包含所有 .sqlite 文件路径的文本文件
DATA_REPO_PATH="/path/to/local/clone/of/data-repo"
if [ -z "$COMMIT_HASH" ] || [ -z "$SPARK_OUTPUT_PATHS_FILE" ]; then
echo "Usage: $0 <commit_hash> <spark_output_paths_file>"
exit 1
fi
cd "$DATA_REPO_PATH"
# 1. 确保数据仓库是最新的
git pull origin main --rebase
# 2. 创建一个以 commit hash 命名的子目录,用于存放本次运行的所有产物
TARGET_DIR="${DATA_REPO_PATH}/${COMMIT_HASH}"
mkdir -p "$TARGET_DIR"
# 3. 从路径文件中读取并移动所有 .sqlite 文件
while IFS= read -r sqlite_path; do
if [ -f "$sqlite_path" ]; then
echo "Moving $sqlite_path to $TARGET_DIR/"
mv "$sqlite_path" "$TARGET_DIR/"
else
echo "Warning: File not found: $sqlite_path"
fi
done < "$SPARK_OUTPUT_PATHS_FILE"
# 4. 使用 Git LFS 来追踪 .sqlite 文件
# 这个配置只需要在仓库初始化时做一次
# git lfs track "*.sqlite"
# git add .gitattributes
# 5. 将新产物添加到 Git
git add "${COMMIT_HASH}/"
# 6. 提交并推送
# 这里的 git user 需要配置为机器人账户
git commit -m "Add CI artifacts for commit ${COMMIT_HASH}"
git push origin main
echo "Successfully committed CI artifacts for ${COMMIT_HASH} to data repo."
这个脚本的逻辑很直白:为每次 CI 运行(由 COMMIT_HASH
标识)创建一个目录,将所有相关的 SQLite 文件移入,然后使用 git add
和 git commit
将它们纳入版本控制。由于我们预先配置了 Git LFS 追踪 .sqlite
文件,这些大文件实际上会被上传到 LFS 服务器,而 Git 仓库中只保留轻量的指针文件。
前端消费:Turbopack 赋能的快速迭代
数据管道的最后一环是展示。我们的开发者看板是一个 React 应用,需要处理和可视化大量数据。
一个典型的场景是展示某个 CI job 的构建步骤耗时分布饼图。
// package.json (scripts section)
"scripts": {
"dev": "turbo dev", // 使用 Turbopack 启动开发服务器
"build": "turbo build",
"start": "turbo start"
},
使用 Turbopack 的好处在开发阶段体现得淋漓尽致。当我们调整 D3.js 的可视化逻辑或数据转换函数时,页面几乎是瞬时更新,无需等待漫长的重新打包。这对于需要频繁微调视觉效果的数据可视化开发来说,是颠覆性的体验提升。
// Simplified React component to fetch and display build steps
import React, { useState, useEffect } from 'react';
// 假设使用 chart.js 或 D3 来绘图
import { Pie } from 'react-chartjs-2';
function BuildStepChart({ commitHash, jobId }) {
const [chartData, setChartData] = useState(null);
const [error, setError] = useState(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
const fetchData = async () => {
setLoading(true);
try {
// API 服务会处理 git checkout 和 sqlite query 的逻辑
const response = await fetch(`/api/artifacts/${commitHash}/${jobId}/build_steps`);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json(); // API 返回 [{name, duration_ms, status}]
// 转换数据为图表库需要的格式
setChartData({
labels: data.map(d => d.name),
datasets: [
{
label: 'Build Step Duration (ms)',
data: data.map(d => d.duration_ms),
backgroundColor: [
'#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF'
],
},
],
});
} catch (e) {
setError(e.message);
} finally {
setLoading(false);
}
};
fetchData();
}, [commitHash, jobId]);
if (loading) return <div>Loading chart...</div>;
if (error) return <div>Error: {error}</div>;
if (!chartData) return <div>No data available.</div>;
return (
<div>
<h3>Build Steps for {jobId}</h3>
<Pie data={chartData} />
</div>
);
}
这个组件背后是一个简单的 API 服务,它接收 commitHash
和 jobId
,然后执行 git checkout
操作检出对应的数据文件,连接 SQLite 数据库,执行 SELECT * FROM build_steps;
,并将结果序列化为 JSON 返回。
局限性与未来展望
这套架构并非没有缺点。一个显而易见的局限是跨 CI 运行的聚合查询性能。虽然查询单个 SQLite 文件非常快,但如果要分析“过去一个月所有失败测试的 Top 10”,就需要检出成千上万个 Git LFS 文件并逐个查询,效率极低。
为了解决这个问题,我们正在实施一个二级异步流程:每天晚上,一个独立的 Spark 作业会扫描当天新生成的 SQLite 产物,将它们的数据提取、合并,然后批量导入到一个真正的列式分析数据库(我们选择了 ClickHouse)中。这为我们提供了两全其美的方案:为开发者提供即时的、原子化的单次运行反馈,同时为平台维护者和数据分析师提供高效的、大规模的历史趋势分析能力。
另一个潜在问题是 Git LFS 仓库的膨胀。我们必须制定严格的数据保留策略和归档机制,例如,只保留近 3 个月的热数据在主 LFS 服务器上,更早的数据则归档到成本更低的冷存储中。
尽管存在这些挑战,但这套结合了 Spark、SQLite 和 Git 的非典型数据管道,在生产环境中被证明是极其有效的。它为我们混乱的 CI 流程带来了秩序、原子性和可追溯性,最终显著提升了开发团队定位和解决性能问题的效率。