构建基于NATS的事件驱动型CI分析管道以解耦单元测试与依赖扫描负载


团队内部的 CI 流水线已经慢得像头搁浅的鲸鱼。一个典型的 Go 项目,一次代码提交触发的 gitlab-ci.yml,串行执行单元测试、代码覆盖率计算、依赖漏洞扫描、静态代码分析,最后是构建和推送镜像。整个过程平均耗时 15 到 20 分钟。最令人头疼的是其脆弱性:依赖扫描服务的一个网络抖动,就能让整个流水线卡住或失败,开发者不得不手动重试,反馈循环被严重拉长。

这种单体式、顺序执行的流水线是技术债的温床。每增加一个新的质量门禁,比如引入一个新的 linter,就意味着要修改几十个项目的 CI/CD 配置文件,并且整体执行时间进一步累加。我们需要的不是在现有框架上小修小补,而是一种根本性的解耦。

初步构想是转向事件驱动模型。当代码提交发生时,CI 系统只做一件事:发布一个“代码已提交”的事件。然后,所有后续的分析、测试、扫描任务都作为独立的、可水平扩展的订阅者服务,并发地处理这个事件。这样,总耗时将取决于最慢的那个任务,而不是所有任务时间的总和。更重要的是,增加新任务只需要部署一个新的订阅者,完全不影响现有流程。

技术选型上,我们对比了 Kafka、RabbitMQ 和 NATS。Kafka 功能强大,但对于我们这个场景来说过于笨重,运维成本高。RabbitMQ 是个不错的选项,但我们更青睐 NATS 的极致简单和高性能。它的 “Core NATS” 提供了我们需要的 “fire-and-forget” 发布/订阅模型,客户端轻量,Go 语言的支持也堪称典范。这正是一个务实的资深工程师所追求的:用最简单的工具解决核心问题,避免不必要的复杂性。

最终的架构蓝图如下:

graph TD
    A[GitLab Webhook] -- on push --> B(Event Publisher);
    B -- publishes 'vcs.commit.received' event --> C{NATS};
    C -- fan-out --> D[Unit Test Worker];
    C -- fan-out --> E[Dependency Scan Worker];
    C -- fan-out --> F[Linter Worker ...];

    D -- publishes 'analysis.result.unittest' --> C;
    E -- publishes 'analysis.result.depscan' --> C;
    F -- publishes 'analysis.result.linter' --> C;

    G[Result Aggregator] -- subscribes 'analysis.result.*' --> C;
    G -- updates commit status --> H[GitLab API];

这个架构的核心是 NATS Broker,它扮演了交通枢纽的角色。所有组件通过它来解耦,独立演进和部署。

第一步:定义事件契约

在事件驱动系统中,事件的结构就是服务间的 API 契约。我们使用 JSON 来定义,保持简洁和可读性。

这是“代码提交”事件的结构,将发布在 vcs.commit.received 主题上:

{
  "eventId": "uuid-v4-string",
  "timestamp": "2023-11-16T15:50:00Z",
  "repository": {
    "url": "[email protected]:my-team/my-service.git",
    "name": "my-service"
  },
  "commit": {
    "hash": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2",
    "author": "[email protected]",
    "message": "feat: Implement new feature"
  },
  "metadata": {
    "source": "gitlab-webhook"
  }
}

分析任务完成后,每个 Worker 会发布一个结果事件到 analysis.result.{task_type} 主题,例如 analysis.result.unittest

{
  "eventId": "another-uuid-v4-string",
  "correlationId": "original-event-id",
  "commitHash": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2",
  "taskType": "unit_test", // "dependency_scan", "linter"
  "status": "success", // "failure", "error"
  "timestamp": "2023-11-16T15:55:00Z",
  "payload": {
    // task-specific data
  }
}

这里的 correlationId 至关重要,它链接回原始的提交事件,方便追踪和聚合。

第二步:构建核心 Worker - 单元测试执行器

这是系统的第一个,也是最重要的 Worker。它的职责是:订阅 vcs.commit.received,克隆指定仓库和 commit,执行 go test,然后将结果发布回 NATS。

以下是 unit-test-worker 的核心 Go 代码。在真实项目中,配置应该来自环境变量或配置文件,这里为了清晰起见直接硬编码。

main.go:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/nats-io/nats.go"
)

// Event structures matching our JSON contract
type CommitEvent struct {
	EventID    string    `json:"eventId"`
	Timestamp  time.Time `json:"timestamp"`
	Repository struct {
		URL  string `json:"url"`
		Name string `json:"name"`
	} `json:"repository"`
	Commit struct {
		Hash    string `json:"hash"`
		Author  string `json:"author"`
		Message string `json:"message"`
	} `json:"commit"`
}

type AnalysisResultEvent struct {
	EventID       string      `json:"eventId"`
	CorrelationID string      `json:"correlationId"`
	CommitHash    string      `json:"commitHash"`
	TaskType      string      `json:"taskType"`
	Status        string      `json:"status"` // "success", "failure", "error"
	Timestamp     time.Time   `json:"timestamp"`
	Payload       interface{} `json:"payload"`
}

const (
	natsURL          = "nats://localhost:4222"
	subscribeSubject = "vcs.commit.received"
	publishSubject   = "analysis.result.unittest"
	workerGroupName  = "unittest-workers"
	taskType         = "unit_test"
)

func main() {
	// --- Graceful Shutdown Setup ---
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	shutdownChan := make(chan os.Signal, 1)
	signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		<-shutdownChan
		log.Println("Shutdown signal received, draining NATS connections...")
		cancel()
	}()

	// --- NATS Connection ---
	nc, err := nats.Connect(natsURL, nats.Name("Unit Test Worker"))
	if err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}
	defer nc.Drain()

	log.Println("Connected to NATS, subscribing to subject:", subscribeSubject)

	// --- NATS Queue Subscription ---
	// Using a queue group ensures only one worker in the group receives a given message.
	// This is how we can scale out by running multiple instances of this worker.
	_, err = nc.QueueSubscribe(subscribeSubject, workerGroupName, func(msg *nats.Msg) {
		handleMessage(ctx, nc, msg)
	})
	if err != nil {
		log.Fatalf("Failed to subscribe: %v", err)
	}

	<-ctx.Done()
	log.Println("Worker shutting down.")
}

func handleMessage(ctx context.Context, nc *nats.Conn, msg *nats.Msg) {
	var event CommitEvent
	if err := json.Unmarshal(msg.Data, &event); err != nil {
		log.Printf("Error unmarshalling message: %v", err)
		// We should probably send this to a dead-letter queue.
		// For now, we just log it.
		return
	}

	log.Printf("Received commit %s for repo %s", event.Commit.Hash, event.Repository.Name)

	// In a real implementation, this would be a more robust execution engine.
	// It would handle cloning the repo into a temporary, isolated directory.
	// For this example, we'll simulate the process.
	runner := NewTestRunner(event.Repository.URL, event.Commit.Hash)
	
	// Use a context with timeout for the test execution itself
	runCtx, runCancel := context.WithTimeout(ctx, 10*time.Minute)
	defer runCancel()

	output, err := runner.Run(runCtx)

	result := AnalysisResultEvent{
		EventID:       uuid.NewString(),
		CorrelationID: event.EventID,
		CommitHash:    event.Commit.Hash,
		TaskType:      taskType,
		Timestamp:     time.Now().UTC(),
	}

	if err != nil {
		log.Printf("Test execution failed for commit %s: %v", event.Commit.Hash, err)
		result.Status = "failure"
		result.Payload = map[string]string{
			"error":  err.Error(),
			"output": output,
		}
	} else {
		log.Printf("Test execution successful for commit %s", event.Commit.Hash)
		result.Status = "success"
		result.Payload = map[string]string{
			"output": output,
		}
	}
	
	publishResult(nc, result)
}

func publishResult(nc *nats.Conn, result AnalysisResultEvent) {
	payload, err := json.Marshal(result)
	if err != nil {
		log.Printf("Failed to marshal result event: %v", err)
		return
	}
	
	if err := nc.Publish(publishSubject, payload); err != nil {
		log.Printf("Failed to publish result for commit %s: %v", result.CommitHash, err)
	} else {
		log.Printf("Successfully published result for commit %s", result.CommitHash)
	}
}

注意这里的 QueueSubscribe。这是一个关键点。它创建了一个“队列组”,NATS 会确保投递到 vcs.commit.received 的每条消息只被该组中的一个订阅者接收。这天然地支持了我们 Worker 的水平扩展:启动 10 个 unit-test-worker 实例,它们会自动形成一个消费队列组,并发处理任务。

第三步:抽象执行逻辑并进行单元测试

上面的 handleMessage 函数直接调用了 runner.Run,但这个 TestRunner 是什么?一个常见的错误是把 git clonego testexec.Command 调用直接写在处理函数里,这会让代码变得极难测试。

我们需要将外部依赖(文件系统、git、go 命令)抽象出来。

runner.go:

package main

import (
	"context"
	"fmt"
	"os"
	"os/exec"
	"path/filepath"
)

// Executor defines an interface for running external commands.
// This is the key to making our code testable.
type Executor interface {
	CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd
}

// OSExecutor is the real implementation that calls os/exec.
type OSExecutor struct{}

func (e *OSExecutor) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd {
	return exec.CommandContext(ctx, name, arg...)
}

// TestRunner encapsulates the logic for running tests.
type TestRunner struct {
	repoURL  string
	commit   string
	executor Executor // Use the interface, not the concrete type
}

func NewTestRunner(repoURL, commit string) *TestRunner {
	return &TestRunner{
		repoURL:  repoURL,
		commit:   commit,
		executor: &OSExecutor{}, // Production code uses the real executor
	}
}

// Run clones the repo and executes tests.
func (r *TestRunner) Run(ctx context.Context) (string, error) {
	// Create a temporary directory for the clone
	tempDir, err := os.MkdirTemp("", "test-runner-*")
	if err != nil {
		return "", fmt.Errorf("failed to create temp dir: %w", err)
	}
	defer os.RemoveAll(tempDir) // Clean up afterwards

	// 1. Git Clone
	cloneCmd := r.executor.CommandContext(ctx, "git", "clone", r.repoURL, tempDir)
	if output, err := cloneCmd.CombinedOutput(); err != nil {
		return string(output), fmt.Errorf("git clone failed: %w", err)
	}

	// 2. Git Checkout specific commit
	checkoutCmd := r.executor.CommandContext(ctx, "git", "-C", tempDir, "checkout", r.commit)
	if output, err := checkoutCmd.CombinedOutput(); err != nil {
		return string(output), fmt.Errorf("git checkout failed: %w", err)
	}
	
	// 3. Run Go tests
	testCmd := r.executor.CommandContext(ctx, "go", "test", "-v", "./...")
	testCmd.Dir = tempDir // Run the command in the cloned repository directory
	
	output, err := testCmd.CombinedOutput()
	if err != nil {
		// Test failures also result in a non-zero exit code, which is an 'error' here.
		// We return the output so the caller can see the test results.
		return string(output), fmt.Errorf("go test command failed: %w", err)
	}
	
	return string(output), nil
}

通过 Executor 接口,我们打破了对 os/exec 的硬编码依赖。现在,我们可以编写一个真正的单元测试,而无需实际的 git 或 go 环境。

runner_test.go:

package main

import (
	"context"
	"errors"
	"os/exec"
	"strings"
	"testing"
)

// MockExecutor implements the Executor interface for testing.
type MockExecutor struct {
	// A function we can swap out in each test case to simulate different command results.
	CommandContextFunc func(ctx context.Context, name string, arg ...string) *exec.Cmd
}

func (m *MockExecutor) CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd {
	return m.CommandContextFunc(ctx, name, arg...)
}

// newTestCmd is a helper for creating mock exec.Cmd instances.
// It uses the 'test' binary which is a standard Go facility for mocking commands.
func newTestCmd(ctx context.Context, name string, arg ...string) *exec.Cmd {
	cs := []string{"-test.run=TestHelperProcess", "--", name}
	cs = append(cs, arg...)
	cmd := exec.CommandContext(ctx, os.Args[0], cs...)
	cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
	return cmd
}

// TestHelperProcess isn't a real test. It's used by the exec.Cmd created in tests
// to simulate the behavior of the command being mocked.
func TestHelperProcess(t *testing.T) {
	if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
		return
	}
	defer os.Exit(0)
	
	args := os.Args
	for len(args) > 0 {
		if args[0] == "--" {
			args = args[1:]
			break
		}
		args = args[1:]
	}

	if len(args) == 0 {
		fmt.Fprintln(os.Stderr, "No command")
		os.Exit(1)
	}
	
	cmd, args := args[0], args[1:]
	
	// Simulate git and go commands based on environment variables set in the test
	if cmd == "git" {
		if os.Getenv("GIT_SHOULD_FAIL") == "1" {
			fmt.Fprintln(os.Stderr, "simulated git error")
			os.Exit(1)
		}
		fmt.Fprintln(os.Stdout, "simulated git success")
	} else if cmd == "go" {
		if os.Getenv("GO_TEST_SHOULD_FAIL") == "1" {
			fmt.Fprintln(os.Stdout, "--- FAIL: TestMyFunction (0.01s)")
			fmt.Fprintln(os.Stderr, "test failed")
			os.Exit(1)
		}
		fmt.Fprintln(os.Stdout, "--- PASS: TestMyFunction (0.01s)")
		fmt.Fprintln(os.Stdout, "PASS")
	}
}


func TestRun_Success(t *testing.T) {
	mockExec := &MockExecutor{
		CommandContextFunc: func(ctx context.Context, name string, arg ...string) *exec.Cmd {
			// All commands should succeed
			os.Unsetenv("GIT_SHOULD_FAIL")
			os.Unsetenv("GO_TEST_SHOULD_FAIL")
			return newTestCmd(ctx, name, arg...)
		},
	}
	
	runner := &TestRunner{
		repoURL:  "fake-url",
		commit:   "fake-commit",
		executor: mockExec,
	}
	
	output, err := runner.Run(context.Background())
	if err != nil {
		t.Fatalf("Expected no error, got %v", err)
	}

	if !strings.Contains(output, "PASS") {
		t.Errorf("Expected output to contain 'PASS', but it didn't. Got: %s", output)
	}
}

func TestRun_GitCloneFails(t *testing.T) {
	mockExec := &MockExecutor{
		CommandContextFunc: func(ctx context.Context, name string, arg ...string) *exec.Cmd {
			if name == "git" && arg[0] == "clone" {
				os.Setenv("GIT_SHOULD_FAIL", "1")
			} else {
				os.Unsetenv("GIT_SHOULD_FAIL")
			}
			return newTestCmd(ctx, name, arg...)
		},
	}
	
	runner := &TestRunner{
		repoURL:  "fake-url",
		commit:   "fake-commit",
		executor: mockExec,
	}

	_, err := runner.Run(context.Background())
	if err == nil {
		t.Fatal("Expected an error, but got nil")
	}

	if !strings.Contains(err.Error(), "git clone failed") {
		t.Errorf("Expected error message to contain 'git clone failed', got '%v'", err)
	}
}

这个测试看起来复杂,但它遵循了一个健壮的模式:通过 TestHelperProcess 技法,我们可以在不执行真实命令的情况下,完全控制 exec.Cmd 的标准输出、标准错误和退出码,从而完整地测试 TestRunner 的所有逻辑分支。

第四步:实现依赖扫描 Worker

依赖扫描 Worker 的结构与单元测试 Worker 非常相似。它订阅相同的主题,但执行的是不同的命令,例如 trivy fs 或 Go 官方的 govulncheck

关键代码片段:

// Inside a DependencyScanRunner's Run method
func (r *DependencyScanRunner) Run(ctx context.Context) (string, error) {
    // ... git clone and checkout logic is the same ...
    
    // Run govulncheck
	scanCmd := r.executor.CommandContext(ctx, "govulncheck", "./...")
	scanCmd.Dir = tempDir
	
	output, err := scanCmd.CombinedOutput()
	// govulncheck exits with a non-zero code if vulnerabilities are found,
	// which is not a process error, but a failure state for our analysis.
	if err != nil {
		// Check if it's an actual execution error or vulnerabilities found.
		// A simple heuristic is to check if output is non-empty.
		if len(output) > 0 {
			// Vulnerabilities found, this is a "successful" scan but a "failed" result.
			return string(output), errors.New("vulnerabilities detected")
		}
		// The command itself failed to run.
		return string(output), fmt.Errorf("govulncheck command execution failed: %w", err)
	}
	
	return "No vulnerabilities found", nil
}

这个 Worker 会将结果发布到 analysis.result.dependencyscan。它的实现和测试过程与 unit-test-worker 完全一致,体现了该架构的模式复用性。

当前方案的局限性与未来迭代

我们现在拥有一个可水平扩展、解耦的 CI 分析管道。新任务的加入不会拖慢旧任务,单个任务的失败也不会阻塞整个流程。

然而,这个V1版本的方案也存在明显的局限性。首先,Core NATS 提供的“最多一次”投递语义意味着,如果消息发布时所有 Worker 实例都恰好离线,这条消息就会永久丢失。在生产环境中,这是不可接受的。下一步迭代必须引入 NATS JetStream,利用其提供的持久化流来确保“至少一次”的投递保证。

其次,任务间没有依赖关系。目前所有任务都是并发执行的,但有时我们希望形成一个有向无环图(DAG),例如“只有当单元测试通过后,才触发构建镜像的任务”。这需要设计更复杂的事件流和可能的工作流引擎。可以引入一个“编排器”服务,它订阅所有结果事件,并根据预定义的规则发布新的任务指令事件。

最后,对执行环境的假设过于简单。在真实环境中,运行 git clonego test 需要一个安全、隔离的执行环境(例如临时的 Docker 容器或 Firecracker microVM),而不是直接在 Worker 的宿主机上执行。这需要引入一层执行器抽象,将任务调度到专用的沙箱环境中。这是从一个简单模型走向生产级平台的必经之路。


  目录