利用 Pulsar Topic 策略解决读写分离架构下的会话一致性问题


用户更新了个人资料,点击保存,刷新页面后却看到了旧的信息。这个场景在任何采用数据库读写分离架构的系统中都可能发生。问题的根源在于主库写入成功后,数据同步到从库存在延迟。当用户的下一个读请求被路由到尚未同步完成的从库时,返回的就是旧数据。这严重破坏了用户体验中的“我写即我见”(Read-Your-Writes)的直觉,我们称之为会话一致性问题。

定义问题:伸缩性与一致性的权衡

在业务量增长时,数据库读写分离是提升系统读取能力、降低主库负载的标准化方案。其基本架构如下:

graph TD
    subgraph "Application Layer"
        A[Write API] --> C{Data Access Layer};
        B[Read API] --> C;
    end
    subgraph "Database Layer"
        C -- Writes --> D[Master DB];
        C -- Reads --> E[Read Replica 1];
        C -- Reads --> F[Read Replica 2];
        D -- Replication Lag --> E;
        D -- Replication Lag --> F;
    end
    U[User] --> A;
    U --> B;

这个架构的固有矛盾在于:写入操作(Write)发生在主库,而后续的读取操作(Read)可能发生在任何一个从库。主从复制的延迟(Replication Lag)是物理存在的,通常在几十到几百毫秒之间,但在高负载或网络抖动时可能达到秒级。正是这个延迟窗口,导致了会话一致性被破坏。

方案A评估:前端轮询或强制延迟

一个直接的想法是在前端解决。当用户提交写操作后,前端可以:

  1. 定时轮询:每隔200毫秒向后端发起一次读请求,直到获取到更新后的数据为止。
  2. 强制延迟:简单地在前端UI上禁用刷新或查询功能1-2秒,赌这个时间内复制已经完成。

优点:

  • 实现简单,不改变后端架构。

缺点:

  • 轮询对服务端造成了大量无效的读负载,且轮询间隔难以确定。间隔太长,用户等待久;间隔太短,服务端压力大。
  • 强制延迟完全是经验主义,无法保证100%成功,本质上是把问题掩盖起来,而不是解决掉。
  • 两种方式的用户体验都极差,用户能明确感知到“等待”或“卡顿”。在真实项目中,这种方案通常在早期会被提出,但很快就会因其粗暴和不可靠而被否决。

方案B评估:会话级别的读请求路由

另一个常见的改进方案是在后端数据访问层做文章。通过识别特定会话的写操作,在一定时间内将该会话的所有读请求强制路由到主库。

// Pseudo-code for session-based routing
const CACHE_TTL_SECONDS = 5;
const userWriteActivityCache = new Cache(); // e.g., Redis

// On Write Operation
async function handleWrite(userId: string, data: any) {
    await writeToMasterDB(data);
    // Mark this user as having recently written
    userWriteActivityCache.set(`write_flag:${userId}`, true, CACHE_TTL_SECONDS);
}

// In Data Access Layer
async function getDBConnectionForRead(userId: string) {
    const hasRecentlyWritten = await userWriteActivityCache.get(`write_flag:${userId}`);
    if (hasRecentlyWritten) {
        // Force read from master for a short period
        return getMasterConnection();
    }
    return getReadReplicaConnection();
}

优点:

  • 对用户透明,能100%保证会话一致性。
  • 逻辑内聚在数据访问层,对业务代码侵入性小。

缺点:

  • 破坏了读写分离的初衷:在高频写入的场景下(例如,社交应用、协作文档),大量用户的读请求会被路由回主库,可能导致主库成为新的瓶颈。
  • “魔法时间”问题CACHE_TTL_SECONDS的设置依然是猜测。设置太长,主库压力大;设置太短,可能依然无法覆盖复制延迟。
  • 增加了架构复杂度:数据访问层需要实现有状态的路由逻辑,并依赖一个外部缓存(如Redis)来维护这个状态。

最终选择:基于Pulsar与MobX的异步一致性信令系统

上述方案都试图在同步的世界里解决一个异步问题。我们最终选择的方案,是拥抱异步,构建一个明确的、事件驱动的信令系统。其核心思想是:写操作完成后,系统并不立即认为数据“可读”,而是通过一个高可靠的消息队列(Pulsar)发送一个“同步完成”的信令,前端状态(MobX)根据这个信令来更新UI,从而驱动后续的读取行为。

架构图如下:

sequenceDiagram
    participant User/Browser
    participant React_MobX
    participant API_Gateway
    participant Write_Service
    participant Master_DB
    participant Pulsar
    participant Notification_Service
    participant Read_Replica_DB

    User/Browser->>React_MobX: 1. 用户执行写操作 (e.g., save profile)
    React_MobX->>API_Gateway: 2. 发起 POST /profile 请求
    API_Gateway->>Write_Service: 3. 路由请求
    Write_Service->>Master_DB: 4. 写入数据, 生成 versionId
    Master_DB-->>Write_Service: 5. 写入成功
    Write_Service->>Pulsar: 6. Produce消息(key=userId, payload={versionId})
    Write_Service-->>API_Gateway: 7. 立即返回 200 OK (payload={status: 'PENDING'})
    API_Gateway-->>React_MobX: 8. 返回响应
    React_MobX->>React_MobX: 9. MobX Store更新状态为 PENDING, UI显示加载中...

    Note right of Notification_Service: (用户登录时已建立WebSocket并订阅)
    Pulsar->>Notification_Service: 10. 消费到消息
    Notification_Service->>React_MobX: 11. 通过WebSocket推送 'SYNCED' 事件 (payload={versionId})
    React_MobX->>React_MobX: 12. MobX Store更新状态为 SYNCED
    Note left of React_MobX: UI自动解除加载状态, 数据可安全读取

    User/Browser->>React_MobX: 13. (后续) 用户发起读请求
    React_MobX->>API_Gateway: 14. GET /profile
    API_Gateway-->>Read_Replica_DB: 15. 读取请求被路由到从库 (此时数据已同步)
    Read_Replica_DB-->>React_MobX: 16. 返回最新数据

为什么是Pulsar?

  • 强有序保证:通过将消息的key设置为userId,Pulsar的Key_Shared订阅模式可以保证同一用户的所有更新事件被同一个消费者按顺序处理。这是实现逻辑一致性的基石。
  • 持久化与高可用:Pulsar基于BookKeeper的日志存储确保了信令消息的可靠性,即使通知服务宕机,消息也不会丢失,服务恢复后可以继续消费。
  • Topic策略:可以为Topic设置消息保留策略(Retention Policy)和TTL,便于调试和问题追溯,过期的信令会自动清理。

为什么是MobX?

  • 精细的反应式系统:MobX可以让我们创建一个ConsistencyStore,其状态(如'PENDING', 'SYNCED')的变化能被UI组件自动观察和响应。当WebSocket收到Pulsar的信令时,只需改变Store中的一个observable属性,所有依赖该状态的React组件都会自动重渲染。这种模式比手动的回调或状态管理要简洁和健壮得多。

核心实现概览

我们将通过三个核心模块来展示这个方案的实现:后端的写入与通知服务、前端的MobX状态管理,以及它们之间的通信协议。

1. 后端:写入服务与通知服务 (Node.js + TypeScript)

Pulsar客户端配置

一个生产级的Pulsar客户端配置需要考虑认证、超时和重试。

// src/pulsar/client.ts
import Pulsar from 'pulsar-client';

const PULSAR_URL = process.env.PULSAR_URL || 'pulsar://localhost:6650';
const JWT_TOKEN = process.env.PULSAR_TOKEN;

let client: Pulsar.Client | null = null;

export const getPulsarClient = (): Pulsar.Client => {
    if (!client) {
        client = new Pulsar.Client({
            serviceUrl: PULSAR_URL,
            authentication: JWT_TOKEN ? new Pulsar.AuthenticationToken({ token: JWT_TOKEN }) : undefined,
            operationTimeoutSeconds: 30,
            log: (level, file, line, message) => {
                // Integrate with your preferred logger, e.g., Pino or Winston
                console.log(`[Pulsar] ${level} at ${file}:${line}: ${message}`);
            },
        });
    }
    return client;
};

// Graceful shutdown
process.on('exit', async () => {
    if (client) {
        await client.close();
    }
});

写入服务 (WriteService)

该服务处理业务逻辑的写入,并在数据库事务成功后,原子性地发出Pulsar消息。这里的关键是:必须在数据库事务提交成功后才能发送消息。如果在事务提交前发送,而事务最终回滚,就会产生一个永远不会发生的“幻影”更新信令。

// src/services/profile.service.ts
import { getPulsarClient } from '../pulsar/client';
import { v4 as uuidv4 } from 'uuid';

const CONSISTENCY_TOPIC = 'persistent://public/default/user-consistency-signal';

export class ProfileService {
    private producer: Pulsar.Producer | null = null;

    async initialize() {
        const client = getPulsarClient();
        this.producer = await client.createProducer({
            topic: CONSISTENCY_TOPIC,
            producerName: 'profile-service-producer',
            batchingEnabled: true,
            batchingMaxPublishDelayMs: 10,
        });
        console.log('Pulsar producer created for topic:', CONSISTENCY_TOPIC);
    }

    async updateUserProfile(userId: string, data: { name: string; email: string }) {
        // In a real application, this would be a database transaction
        // For demonstration, we simulate it.
        const dbTransaction = async () => {
            console.log(`[DB-MASTER] Writing profile for userId: ${userId}`);
            // Simulating DB write
            await new Promise(resolve => setTimeout(resolve, 50));
            return { versionId: uuidv4() }; 
        };

        try {
            const { versionId } = await dbTransaction();

            if (!this.producer) {
                throw new Error("Producer not initialized");
            }

            // After transaction commits, send the signal.
            await this.producer.send({
                data: Buffer.from(JSON.stringify({ userId, versionId, entity: 'profile' })),
                properties: { userId, versionId },
                orderingKey: userId, // Use userId as orderingKey for guaranteed order
            });

            console.log(`[Pulsar] Sent consistency signal for userId: ${userId}, versionId: ${versionId}`);
            
            return { status: 'PENDING', versionId };
        } catch (error) {
            console.error('Failed to update profile or send signal:', error);
            // Implement proper error handling and potential compensation logic
            throw new Error('Update failed');
        }
    }

    async close() {
        await this.producer?.close();
    }
}

通知服务 (NotificationService with WebSockets)

这个服务负责管理WebSocket连接,并为每个连接的用户创建独立的Pulsar消费者来监听他们自己的信令。

// src/services/notification.service.ts
import WebSocket from 'ws';
import { Server as HttpServer } from 'http';
import Pulsar from 'pulsar-client';
import { getPulsarClient } from '../pulsar/client';

const CONSISTENCY_TOPIC = 'persistent://public/default/user-consistency-signal';

export class NotificationService {
    private wss: WebSocket.Server;
    // In a real-world scalable app, this state should be in Redis or another distributed store
    private userConnections: Map<string, WebSocket> = new Map();
    private userConsumers: Map<string, Pulsar.Consumer> = new Map();

    constructor(server: HttpServer) {
        this.wss = new WebSocket.Server({ server, path: '/ws/consistency' });
        this.initialize();
    }

    private initialize() {
        this.wss.on('connection', (ws, req) => {
            // In a real app, you'd get userId from a JWT in the connection URL or headers.
            const userId = new URL(req.url!, `http://${req.headers.host}`).searchParams.get('userId');

            if (!userId) {
                ws.close(1008, 'User ID is required');
                return;
            }

            console.log(`[WebSocket] Client connected for userId: ${userId}`);
            this.userConnections.set(userId, ws);
            this.subscribeToUserSignals(userId, ws);

            ws.on('close', () => {
                console.log(`[WebSocket] Client disconnected for userId: ${userId}`);
                this.cleanupUser(userId);
            });

            ws.on('error', (error) => {
                console.error(`[WebSocket] Error for userId: ${userId}`, error);
                this.cleanupUser(userId);
            });
        });
    }

    private async subscribeToUserSignals(userId: string, ws: WebSocket) {
        try {
            const client = getPulsarClient();
            const consumer = await client.subscribe({
                topic: CONSISTENCY_TOPIC,
                subscription: `notification-service-sub-${userId}-${uuidv4()}`, // Unique subscription per user session
                subscriptionType: 'Exclusive', // Only this consumer gets messages for this subscription
                // We filter messages on the consumer side, but Pulsar's Key_Shared is better for load balancing across service instances.
                // For this single-instance example, filtering by property is clearer.
            });
            this.userConsumers.set(userId, consumer);
            
            // Start listening loop
            (async () => {
                while (this.userConnections.has(userId)) { // Loop only while connection is active
                    try {
                        const msg = await consumer.receive(30000); // 30s timeout
                        if(!msg) continue;

                        const msgProperties = msg.getProperties();
                        if (msgProperties.userId === userId) {
                            const payload = JSON.parse(msg.getData().toString());
                            console.log(`[Pulsar->WS] Forwarding message to userId: ${userId}`);
                            ws.send(JSON.stringify({ type: 'SYNCED', ...payload }));
                        }
                        await consumer.acknowledge(msg);
                    } catch (error) {
                         // Timeout errors are expected, other errors should be logged
                        if (error.message.indexOf('Timeout') === -1) {
                             console.error(`[Pulsar Consumer] Error for userId ${userId}:`, error);
                             break; // Exit loop on persistent error
                        }
                    }
                }
            })();

        } catch (error) {
            console.error(`[Pulsar] Failed to subscribe for userId: ${userId}`, error);
            ws.close(1011, 'Internal server error');
        }
    }

    private async cleanupUser(userId: string) {
        this.userConnections.delete(userId);
        const consumer = this.userConsumers.get(userId);
        if (consumer) {
            await consumer.close();
            this.userConsumers.delete(userId);
            console.log(`[Pulsar Consumer] Closed consumer for userId: ${userId}`);
        }
    }
}

2. 前端:MobX状态管理与React组件 (React + MobX)

一致性状态存储 (ConsistencyStore)

这是前端的核心。它管理着不同数据实体的同步状态。

// src/stores/ConsistencyStore.ts
import { makeAutoObservable, observable } from 'mobx';

type EntityName = 'profile' | 'settings' | 'document';
type SyncStatus = 'IDLE' | 'PENDING' | 'SYNCED' | 'ERROR';

class ConsistencyStore {
    // Using a map to track the status of multiple entities
    entityStatus = observable.map<EntityName, SyncStatus>();

    constructor() {
        makeAutoObservable(this);
    }

    // Action to set status to PENDING right after a write request is sent
    setPending(entity: EntityName) {
        this.entityStatus.set(entity, 'PENDING');
        console.log(`[MobX] Status for ${entity} set to PENDING.`);
    }

    // Action called by WebSocket service when a sync signal is received
    setSynced(entity: EntityName) {
        this.entityStatus.set(entity, 'SYNCED');
        console.log(`[MobX] Status for ${entity} set to SYNCED.`);
        // Optional: set back to IDLE after a delay to allow UI to reset
        setTimeout(() => this.entityStatus.set(entity, 'IDLE'), 2000);
    }

    setError(entity: EntityName) {
        this.entityStatus.set(entity, 'ERROR');
    }

    // Computed value to easily check status in components
    isPending(entity: EntityName): boolean {
        return this.entityStatus.get(entity) === 'PENDING';
    }
}

export const consistencyStore = new ConsistencyStore();

WebSocket服务 (WebSocketService)

负责建立连接并根据收到的消息调用ConsistencyStore的actions。

// src/services/WebSocketService.ts
import { consistencyStore } from '../stores/ConsistencyStore';

let socket: WebSocket | null = null;

export const connectWebSocket = (userId: string) => {
    if (socket && socket.readyState === WebSocket.OPEN) {
        return;
    }

    const wsUrl = `ws://localhost:8080/ws/consistency?userId=${userId}`;
    socket = new WebSocket(wsUrl);

    socket.onopen = () => {
        console.log('[WebSocket] Connection established.');
    };

    socket.onmessage = (event) => {
        try {
            const message = JSON.parse(event.data);
            if (message.type === 'SYNCED' && message.entity) {
                console.log('[WebSocket] Received SYNCED signal:', message);
                consistencyStore.setSynced(message.entity);
            }
        } catch (error) {
            console.error('[WebSocket] Error parsing message:', error);
        }
    };

    socket.onclose = (event) => {
        console.log('[WebSocket] Connection closed:', event.code, event.reason);
        // Implement reconnection logic with backoff strategy here
    };

    socket.onerror = (error) => {
        console.error('[WebSocket] Error:', error);
    };
};

export const disconnectWebSocket = () => {
    socket?.close();
};

React组件 (ProfileForm)

利用observer高阶组件,UI可以毫不费力地对ConsistencyStore的状态做出反应。

// src/components/ProfileForm.tsx
import React, { useState } from 'react';
import { observer } from 'mobx-react-lite';
import { consistencyStore } from '../stores/ConsistencyStore';
import { api } from '../api'; // Your API client

export const ProfileForm = observer(() => {
    const [name, setName] = useState('John Doe');
    const isSaving = consistencyStore.isPending('profile');

    const handleSave = async () => {
        consistencyStore.setPending('profile');
        try {
            await api.updateProfile({ name });
            // The API returns PENDING status, UI state is already handled by MobX
            // The SYNCED signal will arrive via WebSocket later.
        } catch {
            consistencyStore.setError('profile');
        }
    };

    return (
        <div>
            <h3>Edit Profile</h3>
            <input 
                type="text" 
                value={name} 
                onChange={e => setName(e.target.value)}
                disabled={isSaving}
            />
            <button onClick={handleSave} disabled={isSaving}>
                {isSaving ? 'Saving...' : 'Save'}
            </button>
            {consistencyStore.entityStatus.get('profile') === 'SYNCED' && 
                <span style={{ color: 'green', marginLeft: '10px' }}>✓ Saved and synced!</span>
            }
        </div>
    );
});

架构的扩展性与局限性

扩展性:

  • 多实体支持:该模式可以轻松扩展到任何需要会话一致性的数据实体,只需在Pulsar消息中加入entity类型字段即可。
  • 服务解耦:写服务和通知服务是完全解耦的。它们只通过Pulsar进行通信,可以独立部署、伸缩和维护。
  • 容错能力:如果通知服务暂时不可用,Pulsar会保留信令消息。一旦服务恢复,它会从上次消费的位置继续处理,保证信令的最终送达。

局限性:

  • 架构复杂度:引入了Pulsar和WebSocket服务,运维成本和系统的复杂性都显著增加。这套方案对于简单的应用来说是过度设计。
  • WebSocket连接管理:维护大量持久化的WebSocket连接本身就是一个挑战,需要考虑负载均衡、连接风暴、服务端的水平扩展等问题。
  • 最终一致性而非强一致性:此方案解决的是用户感知的会话一致性,它并不提供跨数据库副本的事务级强一致性保证。它只是一个优雅的通知机制,告诉客户端“你写入的数据现在可以从任意副本安全读取了”。
  • 依赖Pulsar:系统的实时通知能力完全依赖于Pulsar集群的健康状况。Pulsar的延迟和可用性直接影响用户体验。

  目录