基于Spring Boot与Cassandra构建支持DVC版本追溯的高吞吐实时特征API


在任何严肃的机器学习系统中,训练-服务偏斜(Training-Serving Skew)都是一个潜藏的、难以根除的顽疾。其中一个核心诱因,就是线上实时推理所用的特征,与线下模型训练所用的特征,在生成逻辑上出现了细微但致命的偏差。问题的根源在于,特征工程的逻辑是持续迭代的,而我们通常缺乏一种机制,能将这种迭代的版本精确地映射到线上服务的每一个API请求上。当模型效果出现衰退时,我们很难回答一个关键问题:“这个预测结果,是基于哪个版本的特征逻辑生成的?”

为了解决这个痛点,我们着手构建一个高吞吐、低延迟且具备版本追溯能力的实时特征服务。这个项目的目标不是做一个简单的KV存储,而是要打造一个能将特征数据与其背后的DVC(Data Version Control)版本元数据强绑定的、可观测的API。

技术选型决策:一个务实的组合

面对高写入吞-吐、按实体ID快速查询以及版本化追溯的需求,技术栈的选择必须审慎。

  1. 核心服务框架: Spring Boot
    我们选择了Spring Boot及其WebFlux响应式编程模型。在IO密集型的场景下,响应式能够更有效地利用系统资源,应对高并发请求。它的生态成熟,整合数据访问层和可观测性组件都非常便捷。

  2. 数据存储: Apache Cassandra
    特征存储的读写模式非常典型:海量写入(特征持续更新)和按主键的点查(模型推理时根据实体ID查询)。这正是Cassandra的强项。其无主架构和线性扩展能力保证了高可用性和高写入性能。更重要的是,它的数据模型允许我们通过精心设计的Clustering Key来实现高效的版本管理和时间范围查询。

  3. 日志与可观测性: Grafana Loki
    我们需要的不只是记录程序运行日志。我们需要的是能够对特征服务的行为进行深度分析的结构化日志。比如,我们想知道“特定特征版本的摄入速率是多少?”或“某个实体ID的特征值分布有无异常?”。Loki基于标签的索引机制非常适合这类场景,相比于全文索引的ELK方案,它在资源消耗上更为克制,非常适合存储海量的操作日志。

  4. 版本控制: DVC
    特征工程的Python脚本、预处理的数据集,这些都通过DVC和Git进行版本管理。关键在于,如何将DVC管理的版本信息(例如一个Git Tag)传递到线上的Spring Boot服务中,并与持久化的特征数据关联起来。我们的方案是在CI/CD流程中,将DVC的版本标识注入到应用的构建产物里。

架构与数据流

整个系统的数据流清晰明确。

graph TD
    subgraph "Offline: DVC & CI/CD"
        A[Feature Engineering Scripts] -- DVC Track --> B(Git Repository)
        B -- CI/CD Pipeline --> C{Build Spring Boot App}
        D[DVC Tag/Commit Hash] -- Inject --> C
    end

    subgraph "Online: Real-time Feature Service"
        E(Upstream Data Source) -- Feature Ingestion --> F[Spring Boot API]
        F -- Structured Logs --> G[Loki]
        F -- Write/Read Features --> H[Cassandra Cluster]
        I(ML Model Service) -- Get Features --> F
    end

    subgraph "Observability"
        G -- Log Queries --> J(Grafana)
        J -- Visualize --> K(SRE/Data Scientist)
    end

步骤化实现:代码是最好的诠释

1. Cassandra数据模型设计

这是整个系统的基石。我们需要存储每个实体的特征向量,并且能够根据版本和时间戳进行检索。

  • entity_id: 查询的主体,比如 user_id, device_id。作为分区键(Partition Key),确保同一个实体的数据落在同一个节点上。
  • feature_set_version: 特征工程逻辑的版本号,来自DVC。作为聚类键(Clustering Key),允许我们在分区内按版本筛选。
  • event_timestamp: 特征生成的时间戳。作为第二个聚类键,实现分区内的时间排序。
-- Keyspace and Table Definition
CREATE KEYSPACE IF NOT EXISTS feature_store
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};

USE feature_store;

CREATE TABLE IF NOT EXISTS entity_features (
    entity_id TEXT,
    feature_set_version TEXT,
    event_timestamp TIMESTAMP,
    features MAP<TEXT, DOUBLE>,
    PRIMARY KEY ((entity_id), feature_set_version, event_timestamp)
) WITH CLUSTERING ORDER BY (feature_set_version DESC, event_timestamp DESC);

这个设计的巧妙之处在于,通过CLUSTERING ORDER BY,Cassandra会自动将最新的版本、最新的数据排在前面。这使得查询“某个实体的最新特征”这一最常见的操作变得极为高效。

2. Spring Boot应用核心实现

首先是项目依赖,我们需要spring-boot-starter-webflux, spring-boot-starter-data-cassandra-reactive以及Loki的日志Appender。

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
    </dependency>
    <!-- Loki logging appender -->
    <dependency>
        <groupId>com.github.loki4j</groupId>
        <artifactId>loki-logback-appender</artifactId>
        <version>1.4.1</version>
    </dependency>
    <!-- Required for JSON layout -->
    <dependency>
        <groupId>net.logstash.logback</groupId>
        <artifactId>logstash-logback-encoder</artifactId>
        <version>7.4</version>
    </dependency>
</dependencies>

接下来是应用配置。

# application.yml
server:
  port: 8080

spring:
  application:
    name: real-time-feature-store
  data:
    cassandra:
      keyspace-name: feature_store
      contact-points: cassandra-node1,cassandra-node2
      port: 9042
      local-datacenter: datacenter1
      schema-action: NONE

# Custom property for DVC version
app:
  feature-set-version: "v1.0.0" # This will be replaced by CI/CD

这里的app.feature-set-version是关键。在CI/CD流水线中,我们会使用sed或类似工具,将构建时的DVC Tag或Git Commit Hash替换这个默认值。

实体与Repository

import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import java.time.Instant;
import java.util.Map;

@Table("entity_features")
public class EntityFeature {

    @PrimaryKey
    private EntityFeatureKey key;

    private Map<String, Double> features;

    // Constructors, Getters, Setters...
}

// Composite Primary Key Class
import org.springframework.data.cassandra.core.cql.Ordering;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyClass;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import java.io.Serializable;
import java.time.Instant;

@PrimaryKeyClass
public class EntityFeatureKey implements Serializable {

    @PrimaryKeyColumn(name = "entity_id", type = PrimaryKeyType.PARTITIONED)
    private String entityId;

    @PrimaryKeyColumn(name = "feature_set_version", ordinal = 0, ordering = Ordering.DESCENDING)
    private String featureSetVersion;

    @PrimaryKeyColumn(name = "event_timestamp", ordinal = 1, ordering = Ordering.DESCENDING)
    private Instant eventTimestamp;
    
    // equals, hashCode, Getters, Setters...
}
import org.springframework.data.cassandra.repository.Query;
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface FeatureRepository extends ReactiveCassandraRepository<EntityFeature, EntityFeatureKey> {

    /**
     * Finds the latest feature vector for a given entity across all versions.
     * Due to the clustering order, the first result is the latest.
     */
    @Query("SELECT * FROM entity_features WHERE entity_id = ?0 LIMIT 1")
    Mono<EntityFeature> findLatestByEntityId(String entityId);

    /**
     * Finds feature vectors for a specific entity and version.
     */
    Flux<EntityFeature> findByKeyEntityIdAndKeyFeatureSetVersion(String entityId, String version);
}

服务与API接口

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;

@Service
public class FeatureService {

    private static final Logger log = LoggerFactory.getLogger(FeatureService.class);

    private final FeatureRepository repository;
    private final String currentFeatureSetVersion;

    public FeatureService(FeatureRepository repository, @Value("${app.feature-set-version}") String version) {
        this.repository = repository;
        this.currentFeatureSetVersion = version;
        log.info("FeatureService initialized with feature_set_version: {}", this.currentFeatureSetVersion);
    }

    public Mono<EntityFeature> ingest(String entityId, Map<String, Double> features) {
        // A real project would have much more robust validation here.
        if (entityId == null || features == null || features.isEmpty()) {
            return Mono.error(new IllegalArgumentException("Entity ID and features cannot be empty."));
        }

        long startTime = System.nanoTime();
        EntityFeatureKey key = new EntityFeatureKey(entityId, this.currentFeatureSetVersion, Instant.now());
        EntityFeature entityFeature = new EntityFeature(key, features);
        
        // Use MDC for structured logging context
        MDC.put("entityId", entityId);
        MDC.put("featureSetVersion", this.currentFeatureSetVersion);
        MDC.put("featureCount", String.valueOf(features.size()));

        return repository.save(entityFeature)
            .doOnSuccess(saved -> {
                long durationMs = (System.nanoTime() - startTime) / 1_000_000;
                MDC.put("ingestionLatencyMs", String.valueOf(durationMs));
                log.info("Successfully ingested features.");
                MDC.clear();
            })
            .doOnError(e -> {
                log.error("Failed to ingest features.", e);
                MDC.clear();
            });
    }

    public Mono<EntityFeature> getLatestFeatures(String entityId) {
        return repository.findLatestByEntityId(entityId);
    }
    
    public Mono<EntityFeature> getFeaturesByVersion(String entityId, String version) {
        // In a real-world scenario, you might want to return a Flux
        // and let the client decide if they need more than one record.
        // For simplicity, we take the first one, which is the latest for that version.
        return repository.findByKeyEntityIdAndKeyFeatureSetVersion(entityId, version).next();
    }
}

注意,我们使用了MDC (Mapped Diagnostic Context) 来为日志添加上下文。这是实现高质量结构化日志的关键。

Controller层

API设计需要清晰地反映其功能。

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import java.util.Map;

@RestController
@RequestMapping("/api/v1/features")
public class FeatureController {

    private final FeatureService featureService;

    public FeatureController(FeatureService featureService) {
        this.featureService = featureService;
    }

    @PostMapping("/{entityId}")
    public Mono<ResponseEntity<Void>> ingestFeatures(
        @PathVariable String entityId, 
        @RequestBody Map<String, Double> features) {
            return featureService.ingest(entityId, features)
                .then(Mono.just(ResponseEntity.status(HttpStatus.CREATED).<Void>build()))
                .onErrorResume(IllegalArgumentException.class, e -> 
                    Mono.just(ResponseEntity.badRequest().build()));
    }

    @GetMapping("/{entityId}/latest")
    public Mono<ResponseEntity<Map<String, Double>>> getLatestFeatures(@PathVariable String entityId) {
        return featureService.getLatestFeatures(entityId)
            .map(feature -> ResponseEntity.ok(feature.getFeatures()))
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping("/{entityId}")
    public Mono<ResponseEntity<Map<String, Double>>> getFeaturesByVersion(
        @PathVariable String entityId,
        @RequestParam("version") String version) {
            return featureService.getFeaturesByVersion(entityId, version)
                .map(feature -> ResponseEntity.ok(feature.getFeatures()))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }
}

3. 配置结构化日志推送到Loki

这是打通可观测性的关键一步。在src/main/resources下创建logback-spring.xml

<configuration>
    <springProperty scope="context" name="LOKI_URL" source="loki.url" defaultValue="http://localhost:3100/loki/api/v1/push"/>
    <springProperty scope="context" name="APP_NAME" source="spring.application.name"/>

    <appender name="LOKI" class="com.github.loki4j.logback.Loki4jAppender">
        <http>
            <url>${LOKI_URL}</url>
        </http>
        <format>
            <label>
                <pattern>app=${APP_NAME},host=${HOSTNAME},level=%level,version=${app.feature-set-version:-unknown}</pattern>
                <!-- Key part: Read featureSetVersion from MDC if present -->
                <readMdcValuesFrom>featureSetVersion</readMdcValuesFrom>
            </label>
            <message>
                <pattern>
                    {
                    "level": "%level",
                    "service": "${APP_NAME}",
                    "thread": "%thread",
                    "logger": "%logger",
                    "message": "%message",
                    "stack_trace": "%ex",
                    "mdc": "%mdc"
                    }
                </pattern>
            </message>
        </format>
        <batchTimeout>5000</batchTimeout>
        <batchSize>1000</batchSize>
    </appender>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>

    <root level="INFO">
        <appender-ref ref="LOKI" />
        <appender-ref ref="CONSOLE" />
    </root>
</configuration>

这段配置做了几件重要的事情:

  1. 定义了一个LOKI appender,指向Loki的推送API。
  2. <label>部分,我们将应用名、主机名、日志级别和重要的feature_set_version作为Loki的标签。标签是Loki高效查询的索引。
  3. <readMdcValuesFrom>指示appender从MDC中读取featureSetVersion并将其也设置为标签。
  4. <message>部分使用LogstashEncoder生成JSON格式的日志行,将所有MDC内容打包到mdc字段中。

现在,每一次featureService.ingest方法的调用,都会产生一条富含上下文的结构化日志,并被推送到Loki。

在Grafana中,我们就可以执行这样的LogQL查询来监控各个特征版本的摄入延迟:

# Calculate the 95th percentile ingestion latency per feature version over the last 15 minutes
quantile_over_time(0.95,
  {app="real-time-feature-store"} | json | unwrap ingestionLatencyMs | __error__="" [15m]
) by (featureSetVersion)

局限性与未来迭代路径

这个方案有效地解决了特征版本追溯和可观测性问题,但在真实生产环境中,它并非终点。

首先,通过application.yml和CI/CD注入版本号的方式虽然简单直接,但不够灵活。服务需要重启才能更新版本号。一个更高级的方案是引入动态配置中心(如Consul或Nacos),CI/CD流程在部署新模型或特征逻辑后,更新配置中心,应用实例动态拉取最新的版本标识。

其次,当前的Cassandra schema没有处理特征本身的Schema演进。如果v2.0.0版本比v1.0.0多了一个特征,或改变了某个特征的数据类型,当前的MAP<TEXT, DOUBLE>模型无法记录这种变化。对此,可以引入外部的Schema Registry,或者在Cassandra中额外存储一个版本对应的Schema信息。

最后,API的读取路径可以进一步优化。对于热点实体,可以在Spring Boot服务本地使用Caffeine作为一级缓存,或者引入Redis作为分布式二级缓存,以减少对Cassandra的直接读取压力,进一步降低推理延迟。这会引入缓存一致性的新挑战,需要权衡。


  目录