构建金融风控实时特征平台中利用分布式锁保证 Spark 计算一致性的架构权衡


在金融风控场景下,特征计算平台的稳定性和数据一致性是整个系统的基石。我们面临的核心挑战是:如何在一个分布式的环境中,调度数百个 Apache Spark 作业,对海量的用户数据进行周期性、分片区的特征计算,同时确保任何一个计算单元(例如,某个用户的近7日交易行为特征)在同一时间点不会被多个并发作业实例重复计算,从而避免数据覆盖、状态错乱等严重问题。

问题的本质是一个细粒度的、跨 Spark 集群的资源争用问题。传统的作业调度系统(如 Airflow, Oozie)能保证作业级别的互斥,但无法深入到作业内部,对作业处理的某个数据分区进行锁定。

方案一:依赖数据库事务的朴素锁定

最初的构想是利用团队已经广泛使用的 PostgreSQL 数据库来充当协调器。基本思路是设计一张锁表(computation_locks),包含 lock_key, job_id, created_at, expires_at 等字段。

lock_key 的设计至关重要,它直接决定了锁的粒度。例如,一个计算用户 uid=123feature_group_A 特征的作业,其 lock_key 可以设计为 feature_group_A:123

作业开始处理一个分区前,会执行类似如下的数据库事务:

BEGIN;
SELECT * FROM computation_locks WHERE lock_key = 'feature_group_A:123' FOR UPDATE;
-- 如果查询结果为空,说明锁未被持有
-- 插入新锁记录
INSERT INTO computation_locks (lock_key, job_id, expires_at) VALUES ('feature_group_A:123', 'spark_job_run_id_abc', NOW() + INTERVAL '15 minutes');
-- 如果查询结果非空,说明锁已被持有,当前作业应跳过该分区或等待
COMMIT;

在 Spark 作业中,实现这个逻辑的伪代码大致如下:

// 在 Spark Driver 或 Executor 端执行
def acquireDbLock(partitionKey: String): Boolean = {
  var connection: Connection = null
  var statement: PreparedStatement = null
  try {
    connection = ConnectionPool.getConnection()
    connection.setAutoCommit(false)

    // 1. 尝试锁定行,如果行不存在,FOR UPDATE不会报错
    val selectSql = "SELECT lock_key FROM computation_locks WHERE lock_key = ? FOR UPDATE"
    statement = connection.prepareStatement(selectSql)
    statement.setString(1, partitionKey)
    val rs = statement.executeQuery()

    if (rs.next()) {
      // 锁已存在,获取失败
      connection.rollback()
      logger.warn(s"Lock for key [$partitionKey] is already held.")
      false
    } else {
      // 锁不存在,可以获取
      val insertSql = "INSERT INTO computation_locks (lock_key, job_id, expires_at) VALUES (?, ?, NOW() + INTERVAL '15 minutes')"
      val insertStatement = connection.prepareStatement(insertSql)
      insertStatement.setString(1, partitionKey)
      insertStatement.setString(2, "current_spark_job_id")
      insertStatement.executeUpdate()
      connection.commit()
      logger.info(s"Successfully acquired lock for key [$partitionKey].")
      true
    }
  } catch {
    case e: Exception =>
      if (connection != null) connection.rollback()
      logger.error(s"Error acquiring lock for key [$partitionKey]", e)
      false
  } finally {
    // 关闭资源
    if (statement != null) statement.close()
    if (connection != null) {
      connection.setAutoCommit(true)
      connection.close()
    }
  }
}

这个方案的优点是显而易见的:实现简单,技术栈收敛,不需要引入新的中间件。然而,在真实项目中,这个方案的脆弱性很快就暴露了:

  1. 数据库成为性能瓶颈:当数千个 Spark Executor Task 并发请求锁时,数据库的连接数和事务处理能力会立刻成为瓶颈。FOR UPDATE 会产生行级锁,在高并发下,锁等待会迅速累积,甚至导致整个锁表的热点竞争,拖慢所有计算作业。
  2. 可用性问题:整个特征计算平台强依赖于这个中心化的数据库。一旦数据库发生抖动或宕机,所有计算任务都会失败或停滞,这是一个典型的单点故障。
  3. 僵尸锁处理复杂:如果一个持有锁的 Spark Executor 异常崩溃,它没有机会执行 DELETE 操作来释放锁。我们必须依赖一个额外的定时清理任务,根据 expires_at 字段来删除过期锁。这个清理任务本身的设计和维护也带来了额外的复杂性。

在生产环境中,我们观察到大量的作业因为等待数据库锁而导致处理时间从10分钟延长到超过1小时,这是完全无法接受的。

方案二:基于 Redis 的高性能分布式锁

我们决定转向一个专用的分布式协调系统。团队对 Redis 有深入的运维经验,因此它成为首选。相比数据库,Redis 提供了原子操作(如 SETNX)、高性能的内存读写以及键过期机制,这些特性天然适合分布式锁的场景。

我们选择使用 Redisson,一个成熟的 Redis Java 客户端,它提供了健壮的、可重入的分布式锁实现,并内置了“看门狗”(watchdog)机制来处理锁的自动续期,有效防止了业务逻辑执行时间超过锁的初始 TTL 导致的锁失效问题。

下面是这个方案的整体架构设计。

graph TD
    subgraph "配置中心"
        Apollo(Apollo Client)
    end

    subgraph "调度与计算层"
        Scheduler(作业调度器 e.g., Airflow)
        SparkCluster[Apache Spark 集群]
        SparkDriver(Spark Driver)
        Executor1(Spark Executor 1)
        Executor2(Spark Executor 2)
        ExecutorN(Spark Executor N)
    end

    subgraph "协调服务"
        Redis(Redis Sentinel/Cluster)
    end

    subgraph "数据存储与服务层"
        DataLake(数据湖 Delta Lake / Hudi)
        GraphQLService(GraphQL API 服务)
    end
    
    subgraph "消费方"
        MLModel(机器学习模型服务)
        Dashboard(风控分析平台)
    end

    Apollo -- "下发作业配置(e.g., lock TTL, feature logic)" --> SparkDriver
    Scheduler -- "触发" --> SparkDriver
    SparkDriver -- "分发任务" --> Executor1
    SparkDriver -- "分发任务" --> Executor2
    SparkDriver -- "分发任务" --> ExecutorN
    
    Executor1 -- "1. 尝试获取锁 RLock.tryLock()" --> Redis
    Executor2 -- "1. 尝试获取锁 RLock.tryLock()" --> Redis
    ExecutorN -- "1. 尝试获取锁 RLock.tryLock()" --> Redis
    Redis -- "2. 返回锁状态" --> Executor1
    
    Executor1 -- "3. 若成功, 计算特征" --> DataLake
    Executor1 -- "4. 计算完成, 释放锁 RLock.unlock()" --> Redis

    GraphQLService -- "查询特征" --> DataLake
    MLModel -- "GraphQL Query" --> GraphQLService
    Dashboard -- "GraphQL Query" --> GraphQLService

1. 分布式锁管理器实现

我们封装了一个 DistributedLockManager,供 Spark 作业调用。

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class DistributedLockManager {

    private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
    private static volatile RedissonClient redissonClient;

    // 使用双重检查锁定模式实现单例
    public static void initialize(String redisAddress, String password) {
        if (redissonClient == null) {
            synchronized (DistributedLockManager.class) {
                if (redissonClient == null) {
                    Config config = new Config();
                    // 假设使用单点模式用于演示,生产环境应使用哨兵或集群模式
                    // config.useSentinelServers().addSentinelAddress(...);
                    config.useSingleServer()
                          .setAddress(redisAddress)
                          .setPassword(password)
                          .setConnectionPoolSize(64)
                          .setConnectionMinimumIdleSize(10);
                    redissonClient = Redisson.create(config);
                    logger.info("Redisson client initialized successfully.");
                }
            }
        }
    }

    public static RedissonClient getClient() {
        if (redissonClient == null) {
            throw new IllegalStateException("DistributedLockManager has not been initialized. Call initialize() first.");
        }
        return redissonClient;
    }

    /**
     * 执行一个被分布式锁保护的代码块。
     * 这是一个关键的封装,确保了锁的正确获取和释放。
     *
     * @param lockKey 锁的唯一标识
     * @param waitTimeSeconds 尝试获取锁的最大等待时间
     * @param leaseTimeSeconds 锁的持有时间(租约时间)。Redisson的看门狗会自动续期。
     * @param task 要执行的任务,它应该返回一个结果
     * @param <T> 任务返回值的类型
     * @return 任务的返回值,如果获取锁失败则返回 null
     */
    public static <T> T executeWithLock(String lockKey, long waitTimeSeconds, long leaseTimeSeconds, Supplier<T> task) {
        RLock lock = getClient().getLock(lockKey);
        boolean isLocked = false;
        try {
            // 尝试在指定时间内获取锁
            isLocked = lock.tryLock(waitTimeSeconds, leaseTimeSeconds, TimeUnit.SECONDS);
            if (isLocked) {
                logger.debug("Lock acquired for key: {}", lockKey);
                // 成功获取锁,执行业务逻辑
                return task.get();
            } else {
                logger.warn("Failed to acquire lock for key: {} within {} seconds.", lockKey, waitTimeSeconds);
                return null; // 或者可以抛出自定义异常
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("Thread was interrupted while trying to acquire lock for key: {}", lockKey, e);
            return null;
        } finally {
            // 必须在 finally 块中释放锁,且只有在当前线程持有锁时才释放
            if (isLocked && lock.isHeldByCurrentThread()) {
                lock.unlock();
                logger.debug("Lock released for key: {}", lockKey);
            }
        }
    }

    public static void shutdown() {
        if (redissonClient != null) {
            redissonClient.shutdown();
        }
    }
}

设计考量:

  • 单例模式: RedissonClient 是一个重量级对象,包含连接池和线程池。在整个 Spark 应用(Driver 和 Executors)中,它应该是单例的。我们通过双重检查锁定来确保其线程安全地初始化。
  • 高阶函数封装: executeWithLock 方法接受一个 Supplier<T> 函数式接口。这种设计模式将锁的获取和释放逻辑与业务逻辑解耦,极大地降低了业务代码出错的风险。使用者只需要关注自己的业务 task,无需手动管理 lock.unlock(),避免了忘记释放锁的常见错误。
  • 精细化的超时控制: tryLock 提供了 waitTimeleaseTime 两个关键参数。waitTime 防止任务因无法获取锁而无限期阻塞。leaseTime 结合 Redisson 的看门狗机制,即使任务执行时间超过预期,只要客户端存活,锁就不会被意外释放。

2. Spark 作业与锁的集成

Spark 作业的核心逻辑现在变得非常清晰。它在处理每个数据分区时,都会调用 DistributedLockManager

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import com.typesafe.config.ConfigFactory // 用于加载配置

object FeatureComputationJob {
  
  private val logger = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]): Unit = {
    // Apollo/本地配置加载 (生产中这部分逻辑会更复杂)
    val appConfig = ConfigFactory.load()
    val redisAddress = appConfig.getString("redis.address")
    val redisPassword = appConfig.getString("redis.password")
    val lockWaitTime = appConfig.getLong("lock.wait_time_seconds")
    val lockLeaseTime = appConfig.getLong("lock.lease_time_seconds")

    // 在 Driver 端初始化分布式锁管理器
    DistributedLockManager.initialize(redisAddress, redisPassword)

    val spark = SparkSession.builder()
      .appName("FeatureComputationJobWithDistributedLock")
      .getOrCreate()
      
    // 读取源数据
    val sourceData: DataFrame = spark.read.parquet("/path/to/raw/data")

    // 假设我们按 user_id 进行分区计算
    // repartition 的目的是为了后续的 foreachPartition 操作
    val partitionedData = sourceData.repartition(1000, col("user_id"))

    partitionedData.foreachPartition { partitionIterator =>
      // 这段代码在 Executor 上执行
      // 再次确保 Executor 上的 RedissonClient 实例被初始化
      // Spark 的闭包序列化机制,单例对象需要这样处理
      if (DistributedLockManager.getClient() == null) {
        DistributedLockManager.initialize(redisAddress, redisPassword)
      }

      // 对分区内的数据进行分组,因为一个 RDD 分区可能包含多个 user_id
      val groupedData = partitionIterator.toList.groupBy(_.getAs[String]("user_id"))

      groupedData.foreach { case (userId, rows) =>
        val lockKey = s"feature_computation:user_features:${userId}"
        
        // 使用我们封装好的高阶函数
        val result = DistributedLockManager.executeWithLock(lockKey, lockWaitTime, lockLeaseTime, () => {
          // --- 业务逻辑开始 ---
          logger.info(s"Processing feature for user $userId")
          
          // 伪代码:执行复杂的特征计算
          // val features = calculateFeatures(rows)
          // saveFeaturesToDataLake(userId, features)
          
          // 返回一个处理结果,比如处理的行数
          rows.size
          // --- 业务逻辑结束 ---
        })

        result match {
          case Some(rowCount) => logger.info(s"Successfully processed $rowCount rows for user $userId.")
          case None => logger.warn(s"Skipped processing for user $userId due to lock contention or error.")
        }
      }
    }

    spark.stop()
    DistributedLockManager.shutdown()
  }
}

单元测试思路:

DistributedLockManager 的测试至关重要。我们可以使用 testcontainers 库启动一个临时的 Redis 实例。测试用例应覆盖:

  • 单线程成功获取和释放锁。
  • 两个并发线程争用同一个锁,只有一个能成功。
  • tryLock 在等待超时后正确返回 false
  • 任务执行时抛出异常,锁依然能被正确释放。

3. 使用 Apollo Client 实现动态配置

硬编码 Redis 地址、超时时间等参数是生产环境的大忌。我们使用 Apollo 配置中心来管理这些参数。

Spark 作业启动时,Driver 会从 Apollo 拉取最新的配置,包括 Redis 连接信息、锁的 waitTimeleaseTime、特征计算的业务开关等。这使得我们可以在不重新打包和部署 Spark 应用的情况下,在线调整锁的行为策略或开启/关闭某些特征的计算。

// 在 Spark Driver 的初始化部分
// 使用 Apollo 官方客户端
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;

public class ConfigManager {
    private static final String APP_ID = "spark-feature-platform";
    private static final String NAMESPACE = "application";
    
    static {
        // 在 JVM 参数中设置 Apollo Meta Server 地址和 App ID
        System.setProperty("app.id", APP_ID); 
        // System.setProperty("apollo.meta", "http://apollo-meta-server");
    }

    private final Config config;

    public ConfigManager() {
        this.config = ConfigService.getConfig(NAMESPACE);
    }

    public String getRedisAddress() {
        // "default_value" 提供了容错能力
        return config.getProperty("redis.address", "redis://localhost:6379");
    }
    
    public long getLockWaitTime() {
        return config.getLongProperty("lock.wait_time_seconds", 5L);
    }
    
    // ... 其他配置获取方法
}

一个常见的错误是在 Spark 的 mapforeachPartition 闭包中直接初始化 Apollo 客户端。这会导致每个 Task 都尝试连接 Apollo,造成不必要的开销和潜在问题。正确的做法是在 Driver 端一次性拉取所有需要的配置,然后通过闭包将这些值(而不是配置客户端对象)传递给 Executors。

4. GraphQL 作为特征服务层

特征计算完成后,数据被写入数据湖(例如 Delta Lake 格式)。下游的机器学习模型服务需要一个高效、灵活的方式来查询这些特征。相比于传统的 RESTful API,GraphQL 在这里展现了巨大优势。

  • 精确获取: 模型服务可能只需要一个用户上百个特征中的某几个。GraphQL 允许客户端精确声明所需字段,避免了不必要的数据传输。
  • 强类型 Schema: 特征的名称、类型、描述都在 GraphQL Schema 中被严格定义,构成了自文档化的 API。
  • 演进性: 新增特征只需要在 Schema 和后端实现中添加,不会影响到现有的客户端查询。

一个简化的 GraphQL Schema 定义可能如下:

# schema.graphqls

type Query {
    """
    获取用户的特征集
    """
    getUserFeatures(
        userId: ID!,
        """
        指定要查询的特征名称列表,如果为空,则返回所有可用特征
        """
        featureNames: [String!]
        """
        查询特定时间点的特征快照,用于模型回溯。格式:ISO 8601
        """
        asOf: String
    ): UserFeatures
}

type UserFeatures {
    userId: ID!
    """
    特征键值对
    """
    features: [Feature!]
    """
    特征版本或计算时间戳
    """
    timestamp: String!
}

type Feature {
    name: String!
    value: String! # 使用 String 类型以获得最大的灵活性,具体类型由消费方解析
    valueType: FeatureType!
}

enum FeatureType {
    NUMERIC
    CATEGORICAL
    BOOLEAN
}

后端的 GraphQL 服务(可以使用 Spring for GraphQL 或 DGS 等框架)的 DataFetcher 负责解析查询,然后与数据湖进行交互(例如,通过 Spark Thrift Server, Trino, 或者直接读取 Delta Lake 文件),获取数据并组装成响应。它与前面的计算流程是完全解耦的,只负责读取最终结果。

架构的扩展性与局限性

这套基于 Redis 分布式锁的 Spark 计算架构,结合 Apollo 的动态配置和 GraphQL 的灵活服务,解决了我们最初面临的核心一致性问题,并为平台带来了良好的可维护性和扩展性。我们可以通过向 Apollo 添加新配置来定义新的特征组,开发新的 Spark 作业,而无需改动核心的锁和调度框架。

然而,这个架构并非万能。

它的主要适用场景是近实时(Near Real-time)的批处理计算,延迟在分钟级别。对于需要毫秒级响应的实时特征(例如,用户在当前会话中的点击次数),这套体系就显得过重。这类需求需要一个基于流处理(如 Flink)的独立管道来补充。

其次,当计算集群规模和作业密度达到一个极高的水平时,Redis 本身也可能成为瓶颈。尽管 Redis 性能卓越,但每秒数万次的 tryLock 请求仍然会消耗其 CPU 资源。届时,可能需要对锁的键进行分片,将压力分散到多个 Redis 集群,或者在作业调度层面进行优化,引入更智能的避让策略,从源头上减少锁的争用。

最后,对分布式锁的依赖也引入了新的运维复杂性。对 Redis 集群的监控、告警和故障恢复预案必须做得非常完善,因为它的稳定性直接关系到整个数据管道的健康。一个常见的坑在于对 Redlock 算法的盲目信任,需要清楚地认识到在存在网络分区和严重 GC 停顿的极端情况下,它并不能提供绝对的安全保证。对于我们的金融风控场景,当前的一致性保证级别与业务风险是匹配的,但这始终是一个需要持续评估的权衡点。


  目录