在任何严肃的机器学习系统中,训练-服务偏斜(Training-Serving Skew)都是一个潜藏的、难以根除的顽疾。其中一个核心诱因,就是线上实时推理所用的特征,与线下模型训练所用的特征,在生成逻辑上出现了细微但致命的偏差。问题的根源在于,特征工程的逻辑是持续迭代的,而我们通常缺乏一种机制,能将这种迭代的版本精确地映射到线上服务的每一个API请求上。当模型效果出现衰退时,我们很难回答一个关键问题:“这个预测结果,是基于哪个版本的特征逻辑生成的?”
为了解决这个痛点,我们着手构建一个高吞吐、低延迟且具备版本追溯能力的实时特征服务。这个项目的目标不是做一个简单的KV存储,而是要打造一个能将特征数据与其背后的DVC(Data Version Control)版本元数据强绑定的、可观测的API。
技术选型决策:一个务实的组合
面对高写入吞-吐、按实体ID快速查询以及版本化追溯的需求,技术栈的选择必须审慎。
核心服务框架: Spring Boot
我们选择了Spring Boot及其WebFlux响应式编程模型。在IO密集型的场景下,响应式能够更有效地利用系统资源,应对高并发请求。它的生态成熟,整合数据访问层和可观测性组件都非常便捷。数据存储: Apache Cassandra
特征存储的读写模式非常典型:海量写入(特征持续更新)和按主键的点查(模型推理时根据实体ID查询)。这正是Cassandra的强项。其无主架构和线性扩展能力保证了高可用性和高写入性能。更重要的是,它的数据模型允许我们通过精心设计的Clustering Key来实现高效的版本管理和时间范围查询。日志与可观测性: Grafana Loki
我们需要的不只是记录程序运行日志。我们需要的是能够对特征服务的行为进行深度分析的结构化日志。比如,我们想知道“特定特征版本的摄入速率是多少?”或“某个实体ID的特征值分布有无异常?”。Loki基于标签的索引机制非常适合这类场景,相比于全文索引的ELK方案,它在资源消耗上更为克制,非常适合存储海量的操作日志。版本控制: DVC
特征工程的Python脚本、预处理的数据集,这些都通过DVC和Git进行版本管理。关键在于,如何将DVC管理的版本信息(例如一个Git Tag)传递到线上的Spring Boot服务中,并与持久化的特征数据关联起来。我们的方案是在CI/CD流程中,将DVC的版本标识注入到应用的构建产物里。
架构与数据流
整个系统的数据流清晰明确。
graph TD
subgraph "Offline: DVC & CI/CD"
A[Feature Engineering Scripts] -- DVC Track --> B(Git Repository)
B -- CI/CD Pipeline --> C{Build Spring Boot App}
D[DVC Tag/Commit Hash] -- Inject --> C
end
subgraph "Online: Real-time Feature Service"
E(Upstream Data Source) -- Feature Ingestion --> F[Spring Boot API]
F -- Structured Logs --> G[Loki]
F -- Write/Read Features --> H[Cassandra Cluster]
I(ML Model Service) -- Get Features --> F
end
subgraph "Observability"
G -- Log Queries --> J(Grafana)
J -- Visualize --> K(SRE/Data Scientist)
end
步骤化实现:代码是最好的诠释
1. Cassandra数据模型设计
这是整个系统的基石。我们需要存储每个实体的特征向量,并且能够根据版本和时间戳进行检索。
-
entity_id: 查询的主体,比如user_id,device_id。作为分区键(Partition Key),确保同一个实体的数据落在同一个节点上。 -
feature_set_version: 特征工程逻辑的版本号,来自DVC。作为聚类键(Clustering Key),允许我们在分区内按版本筛选。 -
event_timestamp: 特征生成的时间戳。作为第二个聚类键,实现分区内的时间排序。
-- Keyspace and Table Definition
CREATE KEYSPACE IF NOT EXISTS feature_store
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};
USE feature_store;
CREATE TABLE IF NOT EXISTS entity_features (
entity_id TEXT,
feature_set_version TEXT,
event_timestamp TIMESTAMP,
features MAP<TEXT, DOUBLE>,
PRIMARY KEY ((entity_id), feature_set_version, event_timestamp)
) WITH CLUSTERING ORDER BY (feature_set_version DESC, event_timestamp DESC);
这个设计的巧妙之处在于,通过CLUSTERING ORDER BY,Cassandra会自动将最新的版本、最新的数据排在前面。这使得查询“某个实体的最新特征”这一最常见的操作变得极为高效。
2. Spring Boot应用核心实现
首先是项目依赖,我们需要spring-boot-starter-webflux, spring-boot-starter-data-cassandra-reactive以及Loki的日志Appender。
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
</dependency>
<!-- Loki logging appender -->
<dependency>
<groupId>com.github.loki4j</groupId>
<artifactId>loki-logback-appender</artifactId>
<version>1.4.1</version>
</dependency>
<!-- Required for JSON layout -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.4</version>
</dependency>
</dependencies>
接下来是应用配置。
# application.yml
server:
port: 8080
spring:
application:
name: real-time-feature-store
data:
cassandra:
keyspace-name: feature_store
contact-points: cassandra-node1,cassandra-node2
port: 9042
local-datacenter: datacenter1
schema-action: NONE
# Custom property for DVC version
app:
feature-set-version: "v1.0.0" # This will be replaced by CI/CD
这里的app.feature-set-version是关键。在CI/CD流水线中,我们会使用sed或类似工具,将构建时的DVC Tag或Git Commit Hash替换这个默认值。
实体与Repository
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import java.time.Instant;
import java.util.Map;
@Table("entity_features")
public class EntityFeature {
@PrimaryKey
private EntityFeatureKey key;
private Map<String, Double> features;
// Constructors, Getters, Setters...
}
// Composite Primary Key Class
import org.springframework.data.cassandra.core.cql.Ordering;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyClass;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import java.io.Serializable;
import java.time.Instant;
@PrimaryKeyClass
public class EntityFeatureKey implements Serializable {
@PrimaryKeyColumn(name = "entity_id", type = PrimaryKeyType.PARTITIONED)
private String entityId;
@PrimaryKeyColumn(name = "feature_set_version", ordinal = 0, ordering = Ordering.DESCENDING)
private String featureSetVersion;
@PrimaryKeyColumn(name = "event_timestamp", ordinal = 1, ordering = Ordering.DESCENDING)
private Instant eventTimestamp;
// equals, hashCode, Getters, Setters...
}
import org.springframework.data.cassandra.repository.Query;
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface FeatureRepository extends ReactiveCassandraRepository<EntityFeature, EntityFeatureKey> {
/**
* Finds the latest feature vector for a given entity across all versions.
* Due to the clustering order, the first result is the latest.
*/
@Query("SELECT * FROM entity_features WHERE entity_id = ?0 LIMIT 1")
Mono<EntityFeature> findLatestByEntityId(String entityId);
/**
* Finds feature vectors for a specific entity and version.
*/
Flux<EntityFeature> findByKeyEntityIdAndKeyFeatureSetVersion(String entityId, String version);
}
服务与API接口
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
@Service
public class FeatureService {
private static final Logger log = LoggerFactory.getLogger(FeatureService.class);
private final FeatureRepository repository;
private final String currentFeatureSetVersion;
public FeatureService(FeatureRepository repository, @Value("${app.feature-set-version}") String version) {
this.repository = repository;
this.currentFeatureSetVersion = version;
log.info("FeatureService initialized with feature_set_version: {}", this.currentFeatureSetVersion);
}
public Mono<EntityFeature> ingest(String entityId, Map<String, Double> features) {
// A real project would have much more robust validation here.
if (entityId == null || features == null || features.isEmpty()) {
return Mono.error(new IllegalArgumentException("Entity ID and features cannot be empty."));
}
long startTime = System.nanoTime();
EntityFeatureKey key = new EntityFeatureKey(entityId, this.currentFeatureSetVersion, Instant.now());
EntityFeature entityFeature = new EntityFeature(key, features);
// Use MDC for structured logging context
MDC.put("entityId", entityId);
MDC.put("featureSetVersion", this.currentFeatureSetVersion);
MDC.put("featureCount", String.valueOf(features.size()));
return repository.save(entityFeature)
.doOnSuccess(saved -> {
long durationMs = (System.nanoTime() - startTime) / 1_000_000;
MDC.put("ingestionLatencyMs", String.valueOf(durationMs));
log.info("Successfully ingested features.");
MDC.clear();
})
.doOnError(e -> {
log.error("Failed to ingest features.", e);
MDC.clear();
});
}
public Mono<EntityFeature> getLatestFeatures(String entityId) {
return repository.findLatestByEntityId(entityId);
}
public Mono<EntityFeature> getFeaturesByVersion(String entityId, String version) {
// In a real-world scenario, you might want to return a Flux
// and let the client decide if they need more than one record.
// For simplicity, we take the first one, which is the latest for that version.
return repository.findByKeyEntityIdAndKeyFeatureSetVersion(entityId, version).next();
}
}
注意,我们使用了MDC (Mapped Diagnostic Context) 来为日志添加上下文。这是实现高质量结构化日志的关键。
Controller层
API设计需要清晰地反映其功能。
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import java.util.Map;
@RestController
@RequestMapping("/api/v1/features")
public class FeatureController {
private final FeatureService featureService;
public FeatureController(FeatureService featureService) {
this.featureService = featureService;
}
@PostMapping("/{entityId}")
public Mono<ResponseEntity<Void>> ingestFeatures(
@PathVariable String entityId,
@RequestBody Map<String, Double> features) {
return featureService.ingest(entityId, features)
.then(Mono.just(ResponseEntity.status(HttpStatus.CREATED).<Void>build()))
.onErrorResume(IllegalArgumentException.class, e ->
Mono.just(ResponseEntity.badRequest().build()));
}
@GetMapping("/{entityId}/latest")
public Mono<ResponseEntity<Map<String, Double>>> getLatestFeatures(@PathVariable String entityId) {
return featureService.getLatestFeatures(entityId)
.map(feature -> ResponseEntity.ok(feature.getFeatures()))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping("/{entityId}")
public Mono<ResponseEntity<Map<String, Double>>> getFeaturesByVersion(
@PathVariable String entityId,
@RequestParam("version") String version) {
return featureService.getFeaturesByVersion(entityId, version)
.map(feature -> ResponseEntity.ok(feature.getFeatures()))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
3. 配置结构化日志推送到Loki
这是打通可观测性的关键一步。在src/main/resources下创建logback-spring.xml。
<configuration>
<springProperty scope="context" name="LOKI_URL" source="loki.url" defaultValue="http://localhost:3100/loki/api/v1/push"/>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<appender name="LOKI" class="com.github.loki4j.logback.Loki4jAppender">
<http>
<url>${LOKI_URL}</url>
</http>
<format>
<label>
<pattern>app=${APP_NAME},host=${HOSTNAME},level=%level,version=${app.feature-set-version:-unknown}</pattern>
<!-- Key part: Read featureSetVersion from MDC if present -->
<readMdcValuesFrom>featureSetVersion</readMdcValuesFrom>
</label>
<message>
<pattern>
{
"level": "%level",
"service": "${APP_NAME}",
"thread": "%thread",
"logger": "%logger",
"message": "%message",
"stack_trace": "%ex",
"mdc": "%mdc"
}
</pattern>
</message>
</format>
<batchTimeout>5000</batchTimeout>
<batchSize>1000</batchSize>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<root level="INFO">
<appender-ref ref="LOKI" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>
这段配置做了几件重要的事情:
- 定义了一个
LOKIappender,指向Loki的推送API。 - 在
<label>部分,我们将应用名、主机名、日志级别和重要的feature_set_version作为Loki的标签。标签是Loki高效查询的索引。 -
<readMdcValuesFrom>指示appender从MDC中读取featureSetVersion并将其也设置为标签。 -
<message>部分使用LogstashEncoder生成JSON格式的日志行,将所有MDC内容打包到mdc字段中。
现在,每一次featureService.ingest方法的调用,都会产生一条富含上下文的结构化日志,并被推送到Loki。
在Grafana中,我们就可以执行这样的LogQL查询来监控各个特征版本的摄入延迟:
# Calculate the 95th percentile ingestion latency per feature version over the last 15 minutes
quantile_over_time(0.95,
{app="real-time-feature-store"} | json | unwrap ingestionLatencyMs | __error__="" [15m]
) by (featureSetVersion)
局限性与未来迭代路径
这个方案有效地解决了特征版本追溯和可观测性问题,但在真实生产环境中,它并非终点。
首先,通过application.yml和CI/CD注入版本号的方式虽然简单直接,但不够灵活。服务需要重启才能更新版本号。一个更高级的方案是引入动态配置中心(如Consul或Nacos),CI/CD流程在部署新模型或特征逻辑后,更新配置中心,应用实例动态拉取最新的版本标识。
其次,当前的Cassandra schema没有处理特征本身的Schema演进。如果v2.0.0版本比v1.0.0多了一个特征,或改变了某个特征的数据类型,当前的MAP<TEXT, DOUBLE>模型无法记录这种变化。对此,可以引入外部的Schema Registry,或者在Cassandra中额外存储一个版本对应的Schema信息。
最后,API的读取路径可以进一步优化。对于热点实体,可以在Spring Boot服务本地使用Caffeine作为一级缓存,或者引入Redis作为分布式二级缓存,以减少对Cassandra的直接读取压力,进一步降低推理延迟。这会引入缓存一致性的新挑战,需要权衡。