一个看似简单的下单操作,背后联动了订单、库存、积分三个微服务。在一次大促压测中,积分服务因为负载过高出现短暂不可用,导致大量用户的订单创建成功、库存也扣减了,但积分发放失败。数据不一致的烂摊子,花了我和团队整整两天时间手动修复,这个教训足够深刻。传统的两阶段提交(2PC)在微服务架构下,由于其同步阻塞和对资源长时间的锁定,显然不是一个可行的选项。我们需要的是一个既能保证最终一致性,又对性能影响最小化的方案。Saga 模式进入了我们的视野。
我们决定自建一个轻量级的 Saga 协调器,它必须满足几个严苛的条件:
- 高性能: 事务协调的开销必须足够低,不能成为系统瓶颈。
- 高可用: 协调器本身不能是单点故障。
- 可观测与可干预: 必须有一个界面能清晰地看到所有正在进行中的事务状态,并在必要时进行人工干预。
技术选型决策过程很快:我们的整个技术栈都在 AWS EKS 上,所以协调器自然也应该以容器化服务的方式部署在 Kubernetes 集群中。为了追求极致的性能,我们做了一个大胆的决定:使用 Memcached 作为 Saga 事务日志的主存储。这是一个权衡,我们用 Memcached 的高速读写来加速事务状态的流转,代价是数据易失性。我们的假设是,绝大多数事务都能在几十秒内完成,这种短生命周期的状态数据没必要一开始就落盘到慢速的数据库。同时,我们为协调器设计了异步的快照机制,将终态数据备份到更持久的存储中,用于审计和灾难恢复。
前端监控面板,我们选择了团队最熟悉的 Ant Design,它能快速构建出专业且功能强大的后台界面。
架构设计与核心组件
整个系统的核心是一个无状态的 Saga Coordinator 服务,它可以水平扩展部署在 EKS 上。所有参与分布式事务的业务服务(Participants)只需要实现自己的业务操作和补偿操作接口即可。
graph TD subgraph "AWS EKS Cluster" direction LR subgraph "Saga Coordinator Pods" Coord1[Coordinator 1] Coord2[Coordinator 2] Coord3[Coordinator 3] end subgraph "Business Service Pods" OrderSvc[Order Service] StockSvc[Stock Service] PaymentSvc[Payment Service] end subgraph "Caching Layer" Memcached[Memcached Cluster] end subgraph "Monitoring UI" AntD[Ant Design Dashboard] end end Client[Client Request] --> OrderSvc OrderSvc -- 1. StartSaga --> Coord1 Coord1 -- 2. Store Saga Log --> Memcached Coord1 -- 3. Execute Step 1 (Create Order) --> OrderSvc OrderSvc -- 4. Step 1 Success --> Coord1 Coord1 -- 5. Update Saga Log --> Memcached Coord1 -- 6. Execute Step 2 (Deduct Stock) --> StockSvc StockSvc -- 7. Step 2 Success --> Coord1 Coord1 -- 8. Update Saga Log --> Memcached Coord1 -- 9. Execute Step 3 (Process Payment) --> PaymentSvc PaymentSvc -- 10. Step 3 Success --> Coord1 Coord1 -- 11. Mark Saga Success --> Memcached AntD -- Periodically Poll / Watch --> Coord2 Coord2 -- Read Saga State --> Memcached
Saga Coordinator 的 Go 语言实现
我们选择 Go 语言来实现协调器,因为它在并发处理和网络编程方面表现出色,非常适合这类中间件的开发。
首先定义 Saga 的核心数据结构。一个 Saga 实例由多个步骤(Step)组成,每个步骤都包含一个正向操作(Action)和一个补偿操作(Compensation)。
// pkg/saga/types.go
package saga
import (
"encoding/json"
"time"
)
// SagaStatus 定义了 Saga 事务的状态
type SagaStatus string
const (
StatusRunning SagaStatus = "RUNNING"
StatusCompleted SagaStatus = "COMPLETED"
StatusCompensating SagaStatus = "COMPENSATING"
StatusFailed SagaStatus = "FAILED"
)
// StepStatus 定义了单个步骤的状态
type StepStatus string
const (
StepStatusPending StepStatus = "PENDING"
StepStatusSuccess StepStatus = "SUCCESS"
StepStatusFailed StepStatus = "FAILED"
StepStatusCompensated StepStatus = "COMPENSATED"
)
// Step 定义了 Saga 中的一个步骤
type Step struct {
Name string `json:"name"`
Action Endpoint `json:"action"`
Compensation Endpoint `json:"compensation"`
Status StepStatus `json:"status"`
Payload json.RawMessage `json:"payload,omitempty"`
Retries int `json:"retries"`
FailureReason string `json:"failure_reason,omitempty"`
}
// Endpoint 定义了需要调用的服务接口
type Endpoint struct {
Method string `json:"method"` // e.g., "POST", "PUT"
URL string `json:"url"`
}
// Saga 定义了一个完整的分布式事务实例
type Saga struct {
ID string `json:"id"`
Name string `json:"name"`
Steps []Step `json:"steps"`
CurrentStep int `json:"current_step"`
Status SagaStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Context json.RawMessage `json:"context"` // 全局上下文,用于在步骤间传递数据
}
接下来是与 Memcached 交互的核心逻辑。我们使用 bradfitz/gomemcache
这个库,并封装一层来处理序列化和反序列化。这里的坑在于,Memcached 的 key 长度有限制(通常是250字节),并且 value 大小也有限制(默认1MB),在设计时需要考虑。
// pkg/store/memcached_store.go
package store
import (
"encoding/json"
"fmt"
"time"
"github.com/bradfitz/gomemcache/memcache"
"github.com/my-org/saga-coordinator/pkg/saga"
)
const (
// Saga 事务在缓存中的过期时间,例如 24 小时
// 确保这个时间足够长,以覆盖任何可能的长时间运行的事务
sagaExpiration = 24 * 60 * 60
)
type MemcachedStore struct {
client *memcache.Client
}
func NewMemcachedStore(servers ...string) (*MemcachedStore, error) {
if len(servers) == 0 {
return nil, fmt.Errorf("memcached servers cannot be empty")
}
mc := memcache.New(servers...)
// 添加一个简单的 ping 来验证连接
if err := mc.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping memcached servers: %w", err)
}
return &MemcachedStore{client: mc}, nil
}
// GetSaga 从 Memcached 中获取 Saga 实例
func (s *MemcachedStore) GetSaga(sagaID string) (*saga.Saga, error) {
item, err := s.client.Get(sagaID)
if err != nil {
if err == memcache.ErrCacheMiss {
return nil, fmt.Errorf("saga with id %s not found: %w", sagaID, err)
}
return nil, fmt.Errorf("failed to get saga %s from memcached: %w", sagaID, err)
}
var sag saga.Saga
if err := json.Unmarshal(item.Value, &sag); err != nil {
return nil, fmt.Errorf("failed to unmarshal saga %s: %w", sagaID, err)
}
return &sag, nil
}
// SaveSaga 将 Saga 实例保存到 Memcached
func (s *MemcachedStore) SaveSaga(sag *saga.Saga) error {
sag.UpdatedAt = time.Now()
data, err := json.Marshal(sag)
if err != nil {
return fmt.Errorf("failed to marshal saga %s: %w", sag.ID, err)
}
// 这里的 value 大小需要监控,如果接近 1MB 限制,需要考虑对 payload 进行压缩或拆分
item := &memcache.Item{
Key: sag.ID,
Value: data,
Expiration: sagaExpiration,
}
if err := s.client.Set(item); err != nil {
return fmt.Errorf("failed to set saga %s to memcached: %w", sag.ID, err)
}
return nil
}
Saga 协调器的核心逻辑是状态机引擎。它接收启动请求,然后一步步执行,根据每一步的结果来决定是继续前进还是转入补偿流程。
// pkg/coordinator/coordinator.go
package coordinator
import (
"bytes"
"encoding/json"
"log"
"net/http"
"time"
"github.com/google/uuid"
"github.com/my-org/saga-coordinator/pkg/saga"
"github.com/my-org/saga-coordinator/pkg/store"
)
type Coordinator struct {
store store.SagaStore
httpClient *http.Client
}
// ... NewCoordinator constructor ...
// StartSaga a new transaction
func (c *Coordinator) StartSaga(name string, steps []saga.Step, initialContext json.RawMessage) (*saga.Saga, error) {
s := &saga.Saga{
ID: uuid.New().String(),
Name: name,
Steps: steps,
CurrentStep: 0,
Status: saga.StatusRunning,
CreatedAt: time.Now(),
Context: initialContext,
}
for i := range s.Steps {
s.Steps[i].Status = saga.StepStatusPending
}
log.Printf("Starting new saga %s (%s)", s.Name, s.ID)
if err := c.store.SaveSaga(s); err != nil {
return nil, err
}
// 异步触发第一步
go c.executeNextStep(s.ID)
return s, nil
}
// executeNextStep is the core engine loop
func (c *Coordinator) executeNextStep(sagaID string) {
s, err := c.store.GetSaga(sagaID)
if err != nil {
log.Printf("Error fetching saga %s for execution: %v", sagaID, err)
return
}
if s.Status != saga.StatusRunning {
log.Printf("Saga %s is not in RUNNING state, aborting execution.", sagaID)
return
}
if s.CurrentStep >= len(s.Steps) {
s.Status = saga.StatusCompleted
log.Printf("Saga %s completed successfully.", sagaID)
_ = c.store.SaveSaga(s)
return
}
step := &s.Steps[s.CurrentStep]
log.Printf("Executing step '%s' for saga %s", step.Name, sagaID)
// 在真实项目中,这里应该使用更健壮的 HTTP 客户端,并处理上下文传递 (e.g. tracing headers)
reqBody, _ := json.Marshal(step.Payload)
req, _ := http.NewRequest(step.Action.Method, step.Action.URL, bytes.NewBuffer(reqBody))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Saga-ID", sagaID) // 传递Saga ID
resp, err := c.httpClient.Do(req)
if err != nil || resp.StatusCode >= 400 {
log.Printf("Step '%s' for saga %s failed. Status: %d, Error: %v", step.Name, sagaID, resp.StatusCode, err)
step.Status = saga.StepStatusFailed
s.Status = saga.StatusCompensating
_ = c.store.SaveSaga(s)
go c.triggerCompensation(sagaID)
return
}
defer resp.Body.Close()
step.Status = saga.StepStatusSuccess
s.CurrentStep++
_ = c.store.SaveSaga(s)
// Immediately trigger the next step
c.executeNextStep(sagaID)
}
func (c *Coordinator) triggerCompensation(sagaID string) {
s, err := c.store.GetSaga(sagaID)
if err != nil {
log.Printf("Error fetching saga %s for compensation: %v", sagaID, err)
return
}
if s.Status != saga.StatusCompensating {
log.Printf("Saga %s is not in COMPENSATING state, aborting compensation.", sagaID)
return
}
log.Printf("Starting compensation for saga %s", sagaID)
// 从后往前执行已成功的步骤的补偿操作
for i := s.CurrentStep - 1; i >= 0; i-- {
step := &s.Steps[i]
if step.Status != saga.StepStatusSuccess {
continue
}
log.Printf("Compensating step '%s' for saga %s", step.Name, sagaID)
reqBody, _ := json.Marshal(step.Payload)
req, _ := http.NewRequest(step.Compensation.Method, step.Compensation.URL, bytes.NewBuffer(reqBody))
req.Header.Set("Content-Type", "application/json")
// 补偿操作必须幂等,并且尽可能保证成功。
// 在真实项目中,这里需要有重试机制和失败告警。
_, err := c.httpClient.Do(req)
if err != nil {
log.Printf("CRITICAL: Compensation for step '%s' failed for saga %s. Manual intervention required. Error: %v", step.Name, sagaID, err)
s.Status = saga.StatusFailed // 标记为最终失败
_ = c.store.SaveSaga(s)
return
}
step.Status = saga.StepStatusCompensated
_ = c.store.SaveSaga(s)
}
s.Status = saga.StatusFailed
log.Printf("Saga %s compensation finished, marked as FAILED.", sagaID)
_ = c.store.SaveSaga(s)
}
部署到 AWS EKS
将协调器服务和 Memcached 部署到 EKS 集群非常直接。
协调器的 deployment.yaml
:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: saga-coordinator
namespace: core
spec:
replicas: 3
selector:
matchLabels:
app: saga-coordinator
template:
metadata:
labels:
app: saga-coordinator
spec:
containers:
- name: coordinator
image: your-account.dkr.ecr.us-east-1.amazonaws.com/saga-coordinator:latest
ports:
- containerPort: 8080
env:
- name: MEMCACHED_SERVERS
value: "memcached-service.cache.svc.cluster.local:11211"
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
---
apiVersion: v1
kind: Service
metadata:
name: saga-coordinator-service
namespace: core
spec:
selector:
app: saga-coordinator
ports:
- protocol: TCP
port: 80
targetPort: 8080
我们使用 Bitnami 的 Helm chart 来部署 Memcached 集群,这比手动管理 statefulset 要简单得多。
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install memcached bitnami/memcached --namespace cache --create-namespace
这会在 cache
命名空间中创建一个名为 memcached-service
的服务,我们的协调器通过这个内部 DNS 名称来连接它。
Ant Design 监控面板
一个好的 UI 对于运维至关重要。我们需要一个界面来实时查看所有 Saga 的状态,并对失败的事务进行干预。Ant Design Pro 的脚手架能让我们快速搭建起原型。
核心是 Saga 列表页,用 Table
组件展示,并通过 Steps
组件可视化每个事务的进度。
// src/pages/SagaList/index.jsx
import React, { useState, useEffect } from 'react';
import { Table, Tag, Steps, Button, Modal, message } from 'antd';
import { fetchSagas, retrySagaStep, compensateSaga } from '@/services/saga';
const { Step } = Steps;
const statusColorMap = {
RUNNING: 'processing',
COMPLETED: 'success',
COMPENSATING: 'warning',
FAILED: 'error',
};
const SagaList = () => {
const [sagas, setSagas] = useState([]);
const [loading, setLoading] = useState(true);
const loadSagas = async () => {
setLoading(true);
try {
const response = await fetchSagas(); // API call to coordinator
setSagas(response.data);
} catch (error) {
message.error('Failed to load sagas');
} finally {
setLoading(false);
}
};
useEffect(() => {
loadSagas();
const interval = setInterval(loadSagas, 5000); // 5秒轮询一次
return () => clearInterval(interval);
}, []);
const handleManualCompensate = (sagaId) => {
Modal.confirm({
title: 'Are you sure you want to manually trigger compensation?',
content: `This will start the compensation process for saga: ${sagaId}`,
onOk: async () => {
await compensateSaga(sagaId);
message.success('Compensation triggered');
loadSagas();
},
});
};
const columns = [
{ title: 'Saga ID', dataIndex: 'id', key: 'id', width: 300 },
{ title: 'Name', dataIndex: 'name', key: 'name' },
{
title: 'Status',
dataIndex: 'status',
key: 'status',
render: (status) => <Tag color={statusColorMap[status]}>{status}</Tag>,
},
{ title: 'Created At', dataIndex: 'created_at', key: 'created_at' },
{
title: 'Actions',
key: 'actions',
render: (_, record) => (
record.status === 'RUNNING' && (
<Button type="primary" danger onClick={() => handleManualCompensate(record.id)}>
Force Compensate
</Button>
)
),
},
];
const expandedRowRender = (saga) => {
return (
<div>
<h4>Saga Progress:</h4>
<Steps current={saga.current_step} status={saga.status === 'FAILED' ? 'error' : 'process'}>
{saga.steps.map((step) => (
<Step key={step.name} title={step.name} description={`Status: ${step.status}`} />
))}
</Steps>
</div>
);
};
return (
<Table
columns={columns}
dataSource={sagas}
loading={loading}
rowKey="id"
expandable={{ expandedRowRender }}
/>
);
};
export default SagaList;
这个界面虽然简单,但提供了核心功能:查看所有事务的列表、状态,并能展开查看每个事务的详细步骤进度。最重要的是,它提供了人工干预的入口,当自动补偿机制因网络等问题失败时,运维人员可以点击“强制补偿”按钮来重新触发补偿流程,这是保证系统鲁棒性的最后一道防线。
局限性与未来展望
这个基于 Memcached 的方案在性能上表现优异,协调器本身的开销极小,能够支撑高并发的事务请求。然而,它的核心弱点也正是其优势的来源——数据的易失性。如果 Coordinator Pod 和 Memcached Pod 同时发生故障,我们可能会丢失正在进行中的事务状态,导致数据不一致。虽然这种情况在 EKS 的多可用区部署下概率极低,但风险依然存在。
为了缓解这个问题,下一步的迭代计划是引入一个异步的、旁路的持久化机制。协调器在每次更新 Memcached 状态后,可以将 Saga 的变更事件(如 StepSuccess
, SagaCompleted
)发送到 Kafka 或 AWS SQS 队列中。一个独立的消费者服务可以从队列中读取这些事件,并将其持久化到 DynamoDB 或 PostgreSQL 中。这样,主流程的性能不受影响,同时我们拥有了用于灾难恢复和事后审计的完整事务日志。
另一个可以优化的方向是,对于补偿流程,目前简单的 HTTP 调用和重试逻辑可能不够健壮。可以考虑引入更成熟的重试库,如 backoff
,并对补偿失败的事务进行更精细化的告警,直接推送到 SRE 的告警平台,而不是仅仅依赖于人工在 Dashboard 上发现。