基于 AWS EKS 构建利用 Memcached 加速状态流转的 Saga 分布式事务协调器


一个看似简单的下单操作,背后联动了订单、库存、积分三个微服务。在一次大促压测中,积分服务因为负载过高出现短暂不可用,导致大量用户的订单创建成功、库存也扣减了,但积分发放失败。数据不一致的烂摊子,花了我和团队整整两天时间手动修复,这个教训足够深刻。传统的两阶段提交(2PC)在微服务架构下,由于其同步阻塞和对资源长时间的锁定,显然不是一个可行的选项。我们需要的是一个既能保证最终一致性,又对性能影响最小化的方案。Saga 模式进入了我们的视野。

我们决定自建一个轻量级的 Saga 协调器,它必须满足几个严苛的条件:

  1. 高性能: 事务协调的开销必须足够低,不能成为系统瓶颈。
  2. 高可用: 协调器本身不能是单点故障。
  3. 可观测与可干预: 必须有一个界面能清晰地看到所有正在进行中的事务状态,并在必要时进行人工干预。

技术选型决策过程很快:我们的整个技术栈都在 AWS EKS 上,所以协调器自然也应该以容器化服务的方式部署在 Kubernetes 集群中。为了追求极致的性能,我们做了一个大胆的决定:使用 Memcached 作为 Saga 事务日志的主存储。这是一个权衡,我们用 Memcached 的高速读写来加速事务状态的流转,代价是数据易失性。我们的假设是,绝大多数事务都能在几十秒内完成,这种短生命周期的状态数据没必要一开始就落盘到慢速的数据库。同时,我们为协调器设计了异步的快照机制,将终态数据备份到更持久的存储中,用于审计和灾难恢复。

前端监控面板,我们选择了团队最熟悉的 Ant Design,它能快速构建出专业且功能强大的后台界面。

架构设计与核心组件

整个系统的核心是一个无状态的 Saga Coordinator 服务,它可以水平扩展部署在 EKS 上。所有参与分布式事务的业务服务(Participants)只需要实现自己的业务操作和补偿操作接口即可。

graph TD
    subgraph "AWS EKS Cluster"
        direction LR
        subgraph "Saga Coordinator Pods"
            Coord1[Coordinator 1]
            Coord2[Coordinator 2]
            Coord3[Coordinator 3]
        end
        
        subgraph "Business Service Pods"
            OrderSvc[Order Service]
            StockSvc[Stock Service]
            PaymentSvc[Payment Service]
        end

        subgraph "Caching Layer"
            Memcached[Memcached Cluster]
        end
        
        subgraph "Monitoring UI"
            AntD[Ant Design Dashboard]
        end
    end

    Client[Client Request] --> OrderSvc
    OrderSvc -- 1. StartSaga --> Coord1
    Coord1 -- 2. Store Saga Log --> Memcached
    Coord1 -- 3. Execute Step 1 (Create Order) --> OrderSvc
    OrderSvc -- 4. Step 1 Success --> Coord1
    Coord1 -- 5. Update Saga Log --> Memcached
    Coord1 -- 6. Execute Step 2 (Deduct Stock) --> StockSvc
    StockSvc -- 7. Step 2 Success --> Coord1
    Coord1 -- 8. Update Saga Log --> Memcached
    Coord1 -- 9. Execute Step 3 (Process Payment) --> PaymentSvc
    PaymentSvc -- 10. Step 3 Success --> Coord1
    Coord1 -- 11. Mark Saga Success --> Memcached

    AntD -- Periodically Poll / Watch --> Coord2
    Coord2 -- Read Saga State --> Memcached

Saga Coordinator 的 Go 语言实现

我们选择 Go 语言来实现协调器,因为它在并发处理和网络编程方面表现出色,非常适合这类中间件的开发。

首先定义 Saga 的核心数据结构。一个 Saga 实例由多个步骤(Step)组成,每个步骤都包含一个正向操作(Action)和一个补偿操作(Compensation)。

// pkg/saga/types.go

package saga

import (
	"encoding/json"
	"time"
)

// SagaStatus 定义了 Saga 事务的状态
type SagaStatus string

const (
	StatusRunning      SagaStatus = "RUNNING"
	StatusCompleted    SagaStatus = "COMPLETED"
	StatusCompensating SagaStatus = "COMPENSATING"
	StatusFailed       SagaStatus = "FAILED"
)

// StepStatus 定义了单个步骤的状态
type StepStatus string

const (
	StepStatusPending    StepStatus = "PENDING"
	StepStatusSuccess    StepStatus = "SUCCESS"
	StepStatusFailed     StepStatus = "FAILED"
	StepStatusCompensated StepStatus = "COMPENSATED"
)

// Step 定义了 Saga 中的一个步骤
type Step struct {
	Name          string          `json:"name"`
	Action        Endpoint        `json:"action"`
	Compensation  Endpoint        `json:"compensation"`
	Status        StepStatus      `json:"status"`
	Payload       json.RawMessage `json:"payload,omitempty"`
	Retries       int             `json:"retries"`
	FailureReason string          `json:"failure_reason,omitempty"`
}

// Endpoint 定义了需要调用的服务接口
type Endpoint struct {
	Method string `json:"method"` // e.g., "POST", "PUT"
	URL    string `json:"url"`
}

// Saga 定义了一个完整的分布式事务实例
type Saga struct {
	ID          string     `json:"id"`
	Name        string     `json:"name"`
	Steps       []Step     `json:"steps"`
	CurrentStep int        `json:"current_step"`
	Status      SagaStatus `json:"status"`
	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
	Context     json.RawMessage `json:"context"` // 全局上下文,用于在步骤间传递数据
}

接下来是与 Memcached 交互的核心逻辑。我们使用 bradfitz/gomemcache 这个库,并封装一层来处理序列化和反序列化。这里的坑在于,Memcached 的 key 长度有限制(通常是250字节),并且 value 大小也有限制(默认1MB),在设计时需要考虑。

// pkg/store/memcached_store.go

package store

import (
	"encoding/json"
	"fmt"
	"time"
	
	"github.com/bradfitz/gomemcache/memcache"
	"github.com/my-org/saga-coordinator/pkg/saga"
)

const (
	// Saga 事务在缓存中的过期时间,例如 24 小时
	// 确保这个时间足够长,以覆盖任何可能的长时间运行的事务
	sagaExpiration = 24 * 60 * 60
)

type MemcachedStore struct {
	client *memcache.Client
}

func NewMemcachedStore(servers ...string) (*MemcachedStore, error) {
	if len(servers) == 0 {
		return nil, fmt.Errorf("memcached servers cannot be empty")
	}
	mc := memcache.New(servers...)
	// 添加一个简单的 ping 来验证连接
	if err := mc.Ping(); err != nil {
		return nil, fmt.Errorf("failed to ping memcached servers: %w", err)
	}
	return &MemcachedStore{client: mc}, nil
}

// GetSaga 从 Memcached 中获取 Saga 实例
func (s *MemcachedStore) GetSaga(sagaID string) (*saga.Saga, error) {
	item, err := s.client.Get(sagaID)
	if err != nil {
		if err == memcache.ErrCacheMiss {
			return nil, fmt.Errorf("saga with id %s not found: %w", sagaID, err)
		}
		return nil, fmt.Errorf("failed to get saga %s from memcached: %w", sagaID, err)
	}

	var sag saga.Saga
	if err := json.Unmarshal(item.Value, &sag); err != nil {
		return nil, fmt.Errorf("failed to unmarshal saga %s: %w", sagaID, err)
	}
	return &sag, nil
}

// SaveSaga 将 Saga 实例保存到 Memcached
func (s *MemcachedStore) SaveSaga(sag *saga.Saga) error {
	sag.UpdatedAt = time.Now()
	data, err := json.Marshal(sag)
	if err != nil {
		return fmt.Errorf("failed to marshal saga %s: %w", sag.ID, err)
	}
    
    // 这里的 value 大小需要监控,如果接近 1MB 限制,需要考虑对 payload 进行压缩或拆分
	item := &memcache.Item{
		Key:        sag.ID,
		Value:      data,
		Expiration: sagaExpiration,
	}

	if err := s.client.Set(item); err != nil {
		return fmt.Errorf("failed to set saga %s to memcached: %w", sag.ID, err)
	}
	return nil
}

Saga 协调器的核心逻辑是状态机引擎。它接收启动请求,然后一步步执行,根据每一步的结果来决定是继续前进还是转入补偿流程。

// pkg/coordinator/coordinator.go

package coordinator

import (
	"bytes"
	"encoding/json"
	"log"
	"net/http"
	"time"

	"github.com/google/uuid"
	"github.com/my-org/saga-coordinator/pkg/saga"
	"github.com/my-org/saga-coordinator/pkg/store"
)

type Coordinator struct {
	store      store.SagaStore
	httpClient *http.Client
}

// ... NewCoordinator constructor ...

// StartSaga a new transaction
func (c *Coordinator) StartSaga(name string, steps []saga.Step, initialContext json.RawMessage) (*saga.Saga, error) {
	s := &saga.Saga{
		ID:          uuid.New().String(),
		Name:        name,
		Steps:       steps,
		CurrentStep: 0,
		Status:      saga.StatusRunning,
		CreatedAt:   time.Now(),
		Context:     initialContext,
	}

	for i := range s.Steps {
		s.Steps[i].Status = saga.StepStatusPending
	}

	log.Printf("Starting new saga %s (%s)", s.Name, s.ID)
	if err := c.store.SaveSaga(s); err != nil {
		return nil, err
	}

	// 异步触发第一步
	go c.executeNextStep(s.ID)
	return s, nil
}

// executeNextStep is the core engine loop
func (c *Coordinator) executeNextStep(sagaID string) {
	s, err := c.store.GetSaga(sagaID)
	if err != nil {
		log.Printf("Error fetching saga %s for execution: %v", sagaID, err)
		return
	}

	if s.Status != saga.StatusRunning {
		log.Printf("Saga %s is not in RUNNING state, aborting execution.", sagaID)
		return
	}

	if s.CurrentStep >= len(s.Steps) {
		s.Status = saga.StatusCompleted
		log.Printf("Saga %s completed successfully.", sagaID)
		_ = c.store.SaveSaga(s)
		return
	}

	step := &s.Steps[s.CurrentStep]
	log.Printf("Executing step '%s' for saga %s", step.Name, sagaID)

    // 在真实项目中,这里应该使用更健壮的 HTTP 客户端,并处理上下文传递 (e.g. tracing headers)
	reqBody, _ := json.Marshal(step.Payload)
	req, _ := http.NewRequest(step.Action.Method, step.Action.URL, bytes.NewBuffer(reqBody))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("X-Saga-ID", sagaID) // 传递Saga ID

	resp, err := c.httpClient.Do(req)
	if err != nil || resp.StatusCode >= 400 {
		log.Printf("Step '%s' for saga %s failed. Status: %d, Error: %v", step.Name, sagaID, resp.StatusCode, err)
		step.Status = saga.StepStatusFailed
		s.Status = saga.StatusCompensating
		_ = c.store.SaveSaga(s)
		go c.triggerCompensation(sagaID)
		return
	}
	defer resp.Body.Close()

	step.Status = saga.StepStatusSuccess
	s.CurrentStep++
	_ = c.store.SaveSaga(s)

	// Immediately trigger the next step
	c.executeNextStep(sagaID)
}

func (c *Coordinator) triggerCompensation(sagaID string) {
	s, err := c.store.GetSaga(sagaID)
	if err != nil {
		log.Printf("Error fetching saga %s for compensation: %v", sagaID, err)
		return
	}
	
	if s.Status != saga.StatusCompensating {
		log.Printf("Saga %s is not in COMPENSATING state, aborting compensation.", sagaID)
		return
	}
	
	log.Printf("Starting compensation for saga %s", sagaID)
	
	// 从后往前执行已成功的步骤的补偿操作
	for i := s.CurrentStep - 1; i >= 0; i-- {
		step := &s.Steps[i]
		if step.Status != saga.StepStatusSuccess {
			continue
		}
		
		log.Printf("Compensating step '%s' for saga %s", step.Name, sagaID)
		reqBody, _ := json.Marshal(step.Payload)
		req, _ := http.NewRequest(step.Compensation.Method, step.Compensation.URL, bytes.NewBuffer(reqBody))
		req.Header.Set("Content-Type", "application/json")
		
        // 补偿操作必须幂等,并且尽可能保证成功。
        // 在真实项目中,这里需要有重试机制和失败告警。
		_, err := c.httpClient.Do(req)
		if err != nil {
			log.Printf("CRITICAL: Compensation for step '%s' failed for saga %s. Manual intervention required. Error: %v", step.Name, sagaID, err)
			s.Status = saga.StatusFailed // 标记为最终失败
			_ = c.store.SaveSaga(s)
			return
		}
		
		step.Status = saga.StepStatusCompensated
		_ = c.store.SaveSaga(s)
	}
	
	s.Status = saga.StatusFailed
	log.Printf("Saga %s compensation finished, marked as FAILED.", sagaID)
	_ = c.store.SaveSaga(s)
}

部署到 AWS EKS

将协调器服务和 Memcached 部署到 EKS 集群非常直接。

协调器的 deployment.yaml

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: saga-coordinator
  namespace: core
spec:
  replicas: 3
  selector:
    matchLabels:
      app: saga-coordinator
  template:
    metadata:
      labels:
        app: saga-coordinator
    spec:
      containers:
      - name: coordinator
        image: your-account.dkr.ecr.us-east-1.amazonaws.com/saga-coordinator:latest
        ports:
        - containerPort: 8080
        env:
        - name: MEMCACHED_SERVERS
          value: "memcached-service.cache.svc.cluster.local:11211"
        readinessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 20
        resources:
          requests:
            cpu: "200m"
            memory: "256Mi"
          limits:
            cpu: "500m"
            memory: "512Mi"
---
apiVersion: v1
kind: Service
metadata:
  name: saga-coordinator-service
  namespace: core
spec:
  selector:
    app: saga-coordinator
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080

我们使用 Bitnami 的 Helm chart 来部署 Memcached 集群,这比手动管理 statefulset 要简单得多。

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install memcached bitnami/memcached --namespace cache --create-namespace

这会在 cache 命名空间中创建一个名为 memcached-service 的服务,我们的协调器通过这个内部 DNS 名称来连接它。

Ant Design 监控面板

一个好的 UI 对于运维至关重要。我们需要一个界面来实时查看所有 Saga 的状态,并对失败的事务进行干预。Ant Design Pro 的脚手架能让我们快速搭建起原型。

核心是 Saga 列表页,用 Table 组件展示,并通过 Steps 组件可视化每个事务的进度。

// src/pages/SagaList/index.jsx
import React, { useState, useEffect } from 'react';
import { Table, Tag, Steps, Button, Modal, message } from 'antd';
import { fetchSagas, retrySagaStep, compensateSaga } from '@/services/saga';

const { Step } = Steps;

const statusColorMap = {
  RUNNING: 'processing',
  COMPLETED: 'success',
  COMPENSATING: 'warning',
  FAILED: 'error',
};

const SagaList = () => {
  const [sagas, setSagas] = useState([]);
  const [loading, setLoading] = useState(true);

  const loadSagas = async () => {
    setLoading(true);
    try {
      const response = await fetchSagas(); // API call to coordinator
      setSagas(response.data);
    } catch (error) {
      message.error('Failed to load sagas');
    } finally {
      setLoading(false);
    }
  };

  useEffect(() => {
    loadSagas();
    const interval = setInterval(loadSagas, 5000); // 5秒轮询一次
    return () => clearInterval(interval);
  }, []);

  const handleManualCompensate = (sagaId) => {
    Modal.confirm({
      title: 'Are you sure you want to manually trigger compensation?',
      content: `This will start the compensation process for saga: ${sagaId}`,
      onOk: async () => {
        await compensateSaga(sagaId);
        message.success('Compensation triggered');
        loadSagas();
      },
    });
  };

  const columns = [
    { title: 'Saga ID', dataIndex: 'id', key: 'id', width: 300 },
    { title: 'Name', dataIndex: 'name', key: 'name' },
    {
      title: 'Status',
      dataIndex: 'status',
      key: 'status',
      render: (status) => <Tag color={statusColorMap[status]}>{status}</Tag>,
    },
    { title: 'Created At', dataIndex: 'created_at', key: 'created_at' },
    {
      title: 'Actions',
      key: 'actions',
      render: (_, record) => (
        record.status === 'RUNNING' && (
          <Button type="primary" danger onClick={() => handleManualCompensate(record.id)}>
            Force Compensate
          </Button>
        )
      ),
    },
  ];
  
  const expandedRowRender = (saga) => {
    return (
      <div>
        <h4>Saga Progress:</h4>
        <Steps current={saga.current_step} status={saga.status === 'FAILED' ? 'error' : 'process'}>
          {saga.steps.map((step) => (
            <Step key={step.name} title={step.name} description={`Status: ${step.status}`} />
          ))}
        </Steps>
      </div>
    );
  };

  return (
    <Table
      columns={columns}
      dataSource={sagas}
      loading={loading}
      rowKey="id"
      expandable={{ expandedRowRender }}
    />
  );
};

export default SagaList;

这个界面虽然简单,但提供了核心功能:查看所有事务的列表、状态,并能展开查看每个事务的详细步骤进度。最重要的是,它提供了人工干预的入口,当自动补偿机制因网络等问题失败时,运维人员可以点击“强制补偿”按钮来重新触发补偿流程,这是保证系统鲁棒性的最后一道防线。

局限性与未来展望

这个基于 Memcached 的方案在性能上表现优异,协调器本身的开销极小,能够支撑高并发的事务请求。然而,它的核心弱点也正是其优势的来源——数据的易失性。如果 Coordinator Pod 和 Memcached Pod 同时发生故障,我们可能会丢失正在进行中的事务状态,导致数据不一致。虽然这种情况在 EKS 的多可用区部署下概率极低,但风险依然存在。

为了缓解这个问题,下一步的迭代计划是引入一个异步的、旁路的持久化机制。协调器在每次更新 Memcached 状态后,可以将 Saga 的变更事件(如 StepSuccess, SagaCompleted)发送到 Kafka 或 AWS SQS 队列中。一个独立的消费者服务可以从队列中读取这些事件,并将其持久化到 DynamoDB 或 PostgreSQL 中。这样,主流程的性能不受影响,同时我们拥有了用于灾难恢复和事后审计的完整事务日志。

另一个可以优化的方向是,对于补偿流程,目前简单的 HTTP 调用和重试逻辑可能不够健壮。可以考虑引入更成熟的重试库,如 backoff,并对补偿失败的事务进行更精细化的告警,直接推送到 SRE 的告警平台,而不是仅仅依赖于人工在 Dashboard 上发现。


  目录