在金融风控场景下,特征计算平台的稳定性和数据一致性是整个系统的基石。我们面临的核心挑战是:如何在一个分布式的环境中,调度数百个 Apache Spark 作业,对海量的用户数据进行周期性、分片区的特征计算,同时确保任何一个计算单元(例如,某个用户的近7日交易行为特征)在同一时间点不会被多个并发作业实例重复计算,从而避免数据覆盖、状态错乱等严重问题。
问题的本质是一个细粒度的、跨 Spark 集群的资源争用问题。传统的作业调度系统(如 Airflow, Oozie)能保证作业级别的互斥,但无法深入到作业内部,对作业处理的某个数据分区进行锁定。
方案一:依赖数据库事务的朴素锁定
最初的构想是利用团队已经广泛使用的 PostgreSQL 数据库来充当协调器。基本思路是设计一张锁表(computation_locks
),包含 lock_key
, job_id
, created_at
, expires_at
等字段。
lock_key
的设计至关重要,它直接决定了锁的粒度。例如,一个计算用户 uid=123
的 feature_group_A
特征的作业,其 lock_key
可以设计为 feature_group_A:123
。
作业开始处理一个分区前,会执行类似如下的数据库事务:
BEGIN;
SELECT * FROM computation_locks WHERE lock_key = 'feature_group_A:123' FOR UPDATE;
-- 如果查询结果为空,说明锁未被持有
-- 插入新锁记录
INSERT INTO computation_locks (lock_key, job_id, expires_at) VALUES ('feature_group_A:123', 'spark_job_run_id_abc', NOW() + INTERVAL '15 minutes');
-- 如果查询结果非空,说明锁已被持有,当前作业应跳过该分区或等待
COMMIT;
在 Spark 作业中,实现这个逻辑的伪代码大致如下:
// 在 Spark Driver 或 Executor 端执行
def acquireDbLock(partitionKey: String): Boolean = {
var connection: Connection = null
var statement: PreparedStatement = null
try {
connection = ConnectionPool.getConnection()
connection.setAutoCommit(false)
// 1. 尝试锁定行,如果行不存在,FOR UPDATE不会报错
val selectSql = "SELECT lock_key FROM computation_locks WHERE lock_key = ? FOR UPDATE"
statement = connection.prepareStatement(selectSql)
statement.setString(1, partitionKey)
val rs = statement.executeQuery()
if (rs.next()) {
// 锁已存在,获取失败
connection.rollback()
logger.warn(s"Lock for key [$partitionKey] is already held.")
false
} else {
// 锁不存在,可以获取
val insertSql = "INSERT INTO computation_locks (lock_key, job_id, expires_at) VALUES (?, ?, NOW() + INTERVAL '15 minutes')"
val insertStatement = connection.prepareStatement(insertSql)
insertStatement.setString(1, partitionKey)
insertStatement.setString(2, "current_spark_job_id")
insertStatement.executeUpdate()
connection.commit()
logger.info(s"Successfully acquired lock for key [$partitionKey].")
true
}
} catch {
case e: Exception =>
if (connection != null) connection.rollback()
logger.error(s"Error acquiring lock for key [$partitionKey]", e)
false
} finally {
// 关闭资源
if (statement != null) statement.close()
if (connection != null) {
connection.setAutoCommit(true)
connection.close()
}
}
}
这个方案的优点是显而易见的:实现简单,技术栈收敛,不需要引入新的中间件。然而,在真实项目中,这个方案的脆弱性很快就暴露了:
- 数据库成为性能瓶颈:当数千个 Spark Executor Task 并发请求锁时,数据库的连接数和事务处理能力会立刻成为瓶颈。
FOR UPDATE
会产生行级锁,在高并发下,锁等待会迅速累积,甚至导致整个锁表的热点竞争,拖慢所有计算作业。 - 可用性问题:整个特征计算平台强依赖于这个中心化的数据库。一旦数据库发生抖动或宕机,所有计算任务都会失败或停滞,这是一个典型的单点故障。
- 僵尸锁处理复杂:如果一个持有锁的 Spark Executor 异常崩溃,它没有机会执行
DELETE
操作来释放锁。我们必须依赖一个额外的定时清理任务,根据expires_at
字段来删除过期锁。这个清理任务本身的设计和维护也带来了额外的复杂性。
在生产环境中,我们观察到大量的作业因为等待数据库锁而导致处理时间从10分钟延长到超过1小时,这是完全无法接受的。
方案二:基于 Redis 的高性能分布式锁
我们决定转向一个专用的分布式协调系统。团队对 Redis 有深入的运维经验,因此它成为首选。相比数据库,Redis 提供了原子操作(如 SETNX
)、高性能的内存读写以及键过期机制,这些特性天然适合分布式锁的场景。
我们选择使用 Redisson,一个成熟的 Redis Java 客户端,它提供了健壮的、可重入的分布式锁实现,并内置了“看门狗”(watchdog)机制来处理锁的自动续期,有效防止了业务逻辑执行时间超过锁的初始 TTL 导致的锁失效问题。
下面是这个方案的整体架构设计。
graph TD subgraph "配置中心" Apollo(Apollo Client) end subgraph "调度与计算层" Scheduler(作业调度器 e.g., Airflow) SparkCluster[Apache Spark 集群] SparkDriver(Spark Driver) Executor1(Spark Executor 1) Executor2(Spark Executor 2) ExecutorN(Spark Executor N) end subgraph "协调服务" Redis(Redis Sentinel/Cluster) end subgraph "数据存储与服务层" DataLake(数据湖 Delta Lake / Hudi) GraphQLService(GraphQL API 服务) end subgraph "消费方" MLModel(机器学习模型服务) Dashboard(风控分析平台) end Apollo -- "下发作业配置(e.g., lock TTL, feature logic)" --> SparkDriver Scheduler -- "触发" --> SparkDriver SparkDriver -- "分发任务" --> Executor1 SparkDriver -- "分发任务" --> Executor2 SparkDriver -- "分发任务" --> ExecutorN Executor1 -- "1. 尝试获取锁 RLock.tryLock()" --> Redis Executor2 -- "1. 尝试获取锁 RLock.tryLock()" --> Redis ExecutorN -- "1. 尝试获取锁 RLock.tryLock()" --> Redis Redis -- "2. 返回锁状态" --> Executor1 Executor1 -- "3. 若成功, 计算特征" --> DataLake Executor1 -- "4. 计算完成, 释放锁 RLock.unlock()" --> Redis GraphQLService -- "查询特征" --> DataLake MLModel -- "GraphQL Query" --> GraphQLService Dashboard -- "GraphQL Query" --> GraphQLService
1. 分布式锁管理器实现
我们封装了一个 DistributedLockManager
,供 Spark 作业调用。
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class DistributedLockManager {
private static final Logger logger = LoggerFactory.getLogger(DistributedLockManager.class);
private static volatile RedissonClient redissonClient;
// 使用双重检查锁定模式实现单例
public static void initialize(String redisAddress, String password) {
if (redissonClient == null) {
synchronized (DistributedLockManager.class) {
if (redissonClient == null) {
Config config = new Config();
// 假设使用单点模式用于演示,生产环境应使用哨兵或集群模式
// config.useSentinelServers().addSentinelAddress(...);
config.useSingleServer()
.setAddress(redisAddress)
.setPassword(password)
.setConnectionPoolSize(64)
.setConnectionMinimumIdleSize(10);
redissonClient = Redisson.create(config);
logger.info("Redisson client initialized successfully.");
}
}
}
}
public static RedissonClient getClient() {
if (redissonClient == null) {
throw new IllegalStateException("DistributedLockManager has not been initialized. Call initialize() first.");
}
return redissonClient;
}
/**
* 执行一个被分布式锁保护的代码块。
* 这是一个关键的封装,确保了锁的正确获取和释放。
*
* @param lockKey 锁的唯一标识
* @param waitTimeSeconds 尝试获取锁的最大等待时间
* @param leaseTimeSeconds 锁的持有时间(租约时间)。Redisson的看门狗会自动续期。
* @param task 要执行的任务,它应该返回一个结果
* @param <T> 任务返回值的类型
* @return 任务的返回值,如果获取锁失败则返回 null
*/
public static <T> T executeWithLock(String lockKey, long waitTimeSeconds, long leaseTimeSeconds, Supplier<T> task) {
RLock lock = getClient().getLock(lockKey);
boolean isLocked = false;
try {
// 尝试在指定时间内获取锁
isLocked = lock.tryLock(waitTimeSeconds, leaseTimeSeconds, TimeUnit.SECONDS);
if (isLocked) {
logger.debug("Lock acquired for key: {}", lockKey);
// 成功获取锁,执行业务逻辑
return task.get();
} else {
logger.warn("Failed to acquire lock for key: {} within {} seconds.", lockKey, waitTimeSeconds);
return null; // 或者可以抛出自定义异常
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Thread was interrupted while trying to acquire lock for key: {}", lockKey, e);
return null;
} finally {
// 必须在 finally 块中释放锁,且只有在当前线程持有锁时才释放
if (isLocked && lock.isHeldByCurrentThread()) {
lock.unlock();
logger.debug("Lock released for key: {}", lockKey);
}
}
}
public static void shutdown() {
if (redissonClient != null) {
redissonClient.shutdown();
}
}
}
设计考量:
- 单例模式:
RedissonClient
是一个重量级对象,包含连接池和线程池。在整个 Spark 应用(Driver 和 Executors)中,它应该是单例的。我们通过双重检查锁定来确保其线程安全地初始化。 - 高阶函数封装:
executeWithLock
方法接受一个Supplier<T>
函数式接口。这种设计模式将锁的获取和释放逻辑与业务逻辑解耦,极大地降低了业务代码出错的风险。使用者只需要关注自己的业务task
,无需手动管理lock.unlock()
,避免了忘记释放锁的常见错误。 - 精细化的超时控制:
tryLock
提供了waitTime
和leaseTime
两个关键参数。waitTime
防止任务因无法获取锁而无限期阻塞。leaseTime
结合 Redisson 的看门狗机制,即使任务执行时间超过预期,只要客户端存活,锁就不会被意外释放。
2. Spark 作业与锁的集成
Spark 作业的核心逻辑现在变得非常清晰。它在处理每个数据分区时,都会调用 DistributedLockManager
。
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import com.typesafe.config.ConfigFactory // 用于加载配置
object FeatureComputationJob {
private val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
// Apollo/本地配置加载 (生产中这部分逻辑会更复杂)
val appConfig = ConfigFactory.load()
val redisAddress = appConfig.getString("redis.address")
val redisPassword = appConfig.getString("redis.password")
val lockWaitTime = appConfig.getLong("lock.wait_time_seconds")
val lockLeaseTime = appConfig.getLong("lock.lease_time_seconds")
// 在 Driver 端初始化分布式锁管理器
DistributedLockManager.initialize(redisAddress, redisPassword)
val spark = SparkSession.builder()
.appName("FeatureComputationJobWithDistributedLock")
.getOrCreate()
// 读取源数据
val sourceData: DataFrame = spark.read.parquet("/path/to/raw/data")
// 假设我们按 user_id 进行分区计算
// repartition 的目的是为了后续的 foreachPartition 操作
val partitionedData = sourceData.repartition(1000, col("user_id"))
partitionedData.foreachPartition { partitionIterator =>
// 这段代码在 Executor 上执行
// 再次确保 Executor 上的 RedissonClient 实例被初始化
// Spark 的闭包序列化机制,单例对象需要这样处理
if (DistributedLockManager.getClient() == null) {
DistributedLockManager.initialize(redisAddress, redisPassword)
}
// 对分区内的数据进行分组,因为一个 RDD 分区可能包含多个 user_id
val groupedData = partitionIterator.toList.groupBy(_.getAs[String]("user_id"))
groupedData.foreach { case (userId, rows) =>
val lockKey = s"feature_computation:user_features:${userId}"
// 使用我们封装好的高阶函数
val result = DistributedLockManager.executeWithLock(lockKey, lockWaitTime, lockLeaseTime, () => {
// --- 业务逻辑开始 ---
logger.info(s"Processing feature for user $userId")
// 伪代码:执行复杂的特征计算
// val features = calculateFeatures(rows)
// saveFeaturesToDataLake(userId, features)
// 返回一个处理结果,比如处理的行数
rows.size
// --- 业务逻辑结束 ---
})
result match {
case Some(rowCount) => logger.info(s"Successfully processed $rowCount rows for user $userId.")
case None => logger.warn(s"Skipped processing for user $userId due to lock contention or error.")
}
}
}
spark.stop()
DistributedLockManager.shutdown()
}
}
单元测试思路:
对 DistributedLockManager
的测试至关重要。我们可以使用 testcontainers
库启动一个临时的 Redis 实例。测试用例应覆盖:
- 单线程成功获取和释放锁。
- 两个并发线程争用同一个锁,只有一个能成功。
-
tryLock
在等待超时后正确返回false
。 - 任务执行时抛出异常,锁依然能被正确释放。
3. 使用 Apollo Client 实现动态配置
硬编码 Redis 地址、超时时间等参数是生产环境的大忌。我们使用 Apollo 配置中心来管理这些参数。
Spark 作业启动时,Driver 会从 Apollo 拉取最新的配置,包括 Redis 连接信息、锁的 waitTime
和 leaseTime
、特征计算的业务开关等。这使得我们可以在不重新打包和部署 Spark 应用的情况下,在线调整锁的行为策略或开启/关闭某些特征的计算。
// 在 Spark Driver 的初始化部分
// 使用 Apollo 官方客户端
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
public class ConfigManager {
private static final String APP_ID = "spark-feature-platform";
private static final String NAMESPACE = "application";
static {
// 在 JVM 参数中设置 Apollo Meta Server 地址和 App ID
System.setProperty("app.id", APP_ID);
// System.setProperty("apollo.meta", "http://apollo-meta-server");
}
private final Config config;
public ConfigManager() {
this.config = ConfigService.getConfig(NAMESPACE);
}
public String getRedisAddress() {
// "default_value" 提供了容错能力
return config.getProperty("redis.address", "redis://localhost:6379");
}
public long getLockWaitTime() {
return config.getLongProperty("lock.wait_time_seconds", 5L);
}
// ... 其他配置获取方法
}
一个常见的错误是在 Spark 的 map
或 foreachPartition
闭包中直接初始化 Apollo 客户端。这会导致每个 Task 都尝试连接 Apollo,造成不必要的开销和潜在问题。正确的做法是在 Driver 端一次性拉取所有需要的配置,然后通过闭包将这些值(而不是配置客户端对象)传递给 Executors。
4. GraphQL 作为特征服务层
特征计算完成后,数据被写入数据湖(例如 Delta Lake 格式)。下游的机器学习模型服务需要一个高效、灵活的方式来查询这些特征。相比于传统的 RESTful API,GraphQL 在这里展现了巨大优势。
- 精确获取: 模型服务可能只需要一个用户上百个特征中的某几个。GraphQL 允许客户端精确声明所需字段,避免了不必要的数据传输。
- 强类型 Schema: 特征的名称、类型、描述都在 GraphQL Schema 中被严格定义,构成了自文档化的 API。
- 演进性: 新增特征只需要在 Schema 和后端实现中添加,不会影响到现有的客户端查询。
一个简化的 GraphQL Schema 定义可能如下:
# schema.graphqls
type Query {
"""
获取用户的特征集
"""
getUserFeatures(
userId: ID!,
"""
指定要查询的特征名称列表,如果为空,则返回所有可用特征
"""
featureNames: [String!]
"""
查询特定时间点的特征快照,用于模型回溯。格式:ISO 8601
"""
asOf: String
): UserFeatures
}
type UserFeatures {
userId: ID!
"""
特征键值对
"""
features: [Feature!]
"""
特征版本或计算时间戳
"""
timestamp: String!
}
type Feature {
name: String!
value: String! # 使用 String 类型以获得最大的灵活性,具体类型由消费方解析
valueType: FeatureType!
}
enum FeatureType {
NUMERIC
CATEGORICAL
BOOLEAN
}
后端的 GraphQL 服务(可以使用 Spring for GraphQL 或 DGS 等框架)的 DataFetcher
负责解析查询,然后与数据湖进行交互(例如,通过 Spark Thrift Server, Trino, 或者直接读取 Delta Lake 文件),获取数据并组装成响应。它与前面的计算流程是完全解耦的,只负责读取最终结果。
架构的扩展性与局限性
这套基于 Redis 分布式锁的 Spark 计算架构,结合 Apollo 的动态配置和 GraphQL 的灵活服务,解决了我们最初面临的核心一致性问题,并为平台带来了良好的可维护性和扩展性。我们可以通过向 Apollo 添加新配置来定义新的特征组,开发新的 Spark 作业,而无需改动核心的锁和调度框架。
然而,这个架构并非万能。
它的主要适用场景是近实时(Near Real-time)的批处理计算,延迟在分钟级别。对于需要毫秒级响应的实时特征(例如,用户在当前会话中的点击次数),这套体系就显得过重。这类需求需要一个基于流处理(如 Flink)的独立管道来补充。
其次,当计算集群规模和作业密度达到一个极高的水平时,Redis 本身也可能成为瓶颈。尽管 Redis 性能卓越,但每秒数万次的 tryLock
请求仍然会消耗其 CPU 资源。届时,可能需要对锁的键进行分片,将压力分散到多个 Redis 集群,或者在作业调度层面进行优化,引入更智能的避让策略,从源头上减少锁的争用。
最后,对分布式锁的依赖也引入了新的运维复杂性。对 Redis 集群的监控、告警和故障恢复预案必须做得非常完善,因为它的稳定性直接关系到整个数据管道的健康。一个常见的坑在于对 Redlock 算法的盲目信任,需要清楚地认识到在存在网络分区和严重 GC 停顿的极端情况下,它并不能提供绝对的安全保证。对于我们的金融风控场景,当前的一致性保证级别与业务风险是匹配的,但这始终是一个需要持续评估的权衡点。