构建由 Axum 驱动的 Serverless MLOps 管道:集成 OpenFaaS、MLflow 与 SQL 实现 iOS 端模型按需个性化


为移动端应用提供的机器学习模型,其生命力在于个性化。一个静态的、一刀切的模型在部署后很快就会因为用户行为数据的变化而变得迟钝。传统的中心化批量训练模式,周期长、成本高,无法满足对单一用户行为的实时响应。我们面临的挑战是:当特定用户在 iOS 应用上完成一个关键行为时,如何能以低成本、高效率的方式,立即为其触发一次专属的模型更新,并将整个过程纳入严格的 MLOps 生命周期管理。

这个问题的本质是事件驱动的、细粒度的计算。为每个用户的每次 retraining 请求都维护一个常驻的训练服务是资源上的巨大浪费。这正是 Serverless 的用武之地。

我们的目标是构建一个全自动化的管道:iOS 客户端的一个行为触发 Axum 编写的高性能后端服务,该服务记录事件后,异步调用 OpenFaaS 上的一个 Python 函数。这个函数会拉取用户的历史数据,使用 MLflow 启动一次新的实验性训练,并将训练产出的新模型版本记录在 MLflow Tracking Server 中。整个过程的数据状态和用户-模型映射关系,则由一个关系型数据库(PostgreSQL)来保证事务性和一致性。

架构决策与技术选型

这个架构的核心在于解耦和职责分离。

  1. Orchestration Core (Axum, Rust): 系统的入口和调度中心。我们需要一个稳定、高性能且内存安全的 API 网关来接收来自客户端的请求。Axum 基于 Tokio,提供了无与伦比的并发性能和可靠性。它负责验证请求、与数据库交互、并将训练任务分派给计算层。在真实项目中,选择 Rust 意味着我们优先考虑的是长期运行的稳定性和对系统资源的极致利用。

  2. Serverless Compute (OpenFaaS, Python): 训练任务是计算密集型但非持续性的,完美契合 Serverless 模型。OpenFaaS 简单、开放,可以部署在任何 Kubernetes 集群上。我们选择 Python 作为训练函数的运行时,因为它拥有最丰富的 ML 生态。每个训练任务都是一个隔离的、无状态的函数调用,按需使用资源,用完即焚。

  3. MLOps Backbone (MLflow): 当个性化模型数量从几十个增长到数百万个时,如果没有一个强大的追踪系统,整个项目将陷入混乱。MLflow 提供了实验追踪、模型版本管理和模型注册表。这是保证我们能够审计、回滚和比较每个用户模型的关键。

  4. State Persistence (PostgreSQL): 虽然 MLflow 自身可以管理模型元数据,但我们需要一个独立的、高一致性的数据库来存储核心业务数据:用户信息、关键交互事件,以及最重要的——用户当前应该使用哪个 MLflow Run ID 对应的模型。SQL 数据库的事务能力在这里至关重要。

整个流程的交互逻辑可以用下面的时序图清晰地表示:

sequenceDiagram
    participant iOS_Client as iOS Client
    participant Axum_Service as Axum Orchestrator
    participant PostgreSQL_DB as PostgreSQL
    participant OpenFaaS_GW as OpenFaaS Gateway
    participant Training_Fn as Python Training Function
    participant MLflow_Server as MLflow Tracking Server

    iOS_Client->>+Axum_Service: POST /v1/retrain (userId, eventData)
    Axum_Service->>+PostgreSQL_DB: INSERT INTO interaction_events;
    PostgreSQL_DB-->>-Axum_Service: Success
    Axum_Service->>+OpenFaaS_GW: ASYNC POST /async-function/user-retrainer (userId)
    OpenFaaS_GW-->>-Axum_Service: 202 Accepted
    Axum_Service-->>-iOS_Client: 200 OK
    
    Note right of OpenFaaS_GW: OpenFaaS schedules the function execution.
    
    OpenFaaS_GW->>+Training_Fn: Invoke with (userId)
    Training_Fn->>+PostgreSQL_DB: SELECT * FROM interaction_events WHERE user_id = ?
    PostgreSQL_DB-->>-Training_Fn: User historical data
    
    Training_Fn->>+MLflow_Server: Start Run (experiment_name="user_personalization")
    MLflow_Server-->>-Training_Fn: new_run_id
    
    Note right of Training_Fn: Model training logic (e.g., scikit-learn)
    
    Training_Fn->>MLflow_Server: Log Params (e.g., learning_rate)
    Training_Fn->>MLflow_Server: Log Metrics (e.g., accuracy)
    Training_Fn->>MLflow_Server: Log Model Artifact (model.pkl)
    
    Training_Fn->>+PostgreSQL_DB: UPDATE user_models SET active_run_id = new_run_id WHERE user_id = ?
    PostgreSQL_DB-->>-Training_Fn: Success
    
    Training_Fn-->>-OpenFaaS_GW: Execution Finished

生产级环境搭建

在深入代码之前,一个稳定、可复现的后端环境是基础。我们使用 docker-compose 来编排 MLflow 和 PostgreSQL。这里的关键点在于,MLflow Server 需要一个后端数据库来存储元数据,以及一个对象存储来存放模型文件等产物。在生产中,这通常是 S3 或类似的云存储;本地开发我们用 MinIO 来模拟。

docker-compose.yml:

version: '3.8'

services:
  postgres-mlflow:
    image: postgres:14
    container_name: postgres-mlflow-db
    ports:
      - "5433:5432" # Use a non-default port to avoid conflicts
    environment:
      - POSTGRES_USER=mlflow
      - POSTGRES_PASSWORD=mlflow
      - POSTGRES_DB=mlflowdb
    volumes:
      - mlflow_postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U mlflow"]
      interval: 5s
      timeout: 5s
      retries: 5

  minio:
    image: minio/minio:RELEASE.2023-01-25T00-19-54Z
    container_name: minio-mlflow-storage
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - mlflow_minio_data:/data
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 5s
      timeout: 2s
      retries: 5

  mlflow:
    image: python:3.9-slim
    container_name: mlflow-tracking-server
    ports:
      - "5000:5000"
    environment:
      - AWS_ACCESS_KEY_ID=minioadmin
      - AWS_SECRET_ACCESS_KEY=minioadmin
      - MLFLOW_S3_ENDPOINT_URL=http://minio:9000
    command: >
      sh -c "
      pip install mlflow boto3 psycopg2-binary &&
      mlflow server 
      --backend-store-uri postgresql://mlflow:mlflow@postgres-mlflow:5432/mlflowdb
      --default-artifact-root s3://mlflow/
      --host 0.0.0.0
      --port 5000
      "
    depends_on:
      postgres-mlflow:
        condition: service_healthy
      minio:
        condition: service_healthy

volumes:
  mlflow_postgres_data:
  mlflow_minio_data:

这个配置启动后,你需要在 http://localhost:9000 上手动创建一个名为 mlflow 的 bucket。这是一个常见的初始化陷阱。

核心调度器:Axum 服务实现

Axum 服务是整个流程的大脑。它需要处理 HTTP 请求、与数据库通信,并调用 OpenFaaS。

项目结构:

.
├── Cargo.toml
├── src
│   ├── main.rs
│   ├── routes.rs
│   ├── handlers.rs
│   ├── state.rs
│   └── error.rs
└── config.toml

config.toml:

[server]
host = "0.0.0.0"
port = 8000

[database]
url = "postgres://user:password@localhost:5432/appdb"

[openfaas]
gateway_url = "http://localhost:8080" # Your OpenFaaS gateway
retrain_function_name = "user-retrainer"

src/main.rs (启动入口):

use anyhow::Result;
use axum::Server;
use sea_orm::{Database, DatabaseConnection};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::info;

mod error;
mod handlers;
mod routes;
mod state;

use state::AppState;

#[tokio::main]
async fn main() -> Result<()> {
    // 初始化日志
    tracing_subscriber::fmt::init();

    // 加载配置,这里为了简化直接硬编码,真实项目应从文件或环境变量加载
    let db_url = "postgres://youruser:yourpass@localhost:5432/personadb";
    let faas_gateway = "http://127.0.0.1:8080";
    let function_name = "user-retrainer";

    // 创建数据库连接池
    let db: DatabaseConnection = Database::connect(db_url)
        .await
        .expect("Failed to connect to database");
    info!("Database connection established.");
    
    // 创建 reqwest 客户端用于调用 OpenFaaS
    let http_client = reqwest::Client::new();

    // 创建应用共享状态
    let app_state = Arc::new(AppState {
        db,
        http_client,
        openfaas_gateway_url: faas_gateway.to_string(),
        retrain_function_name: function_name.to_string(),
    });

    // 定义服务地址
    let addr = SocketAddr::from(([0, 0, 0, 0], 8000));
    info!("Server listening on {}", addr);

    // 构建并运行服务
    let app = routes::create_router(app_state);
    Server::bind(&addr)
        .serve(app.into_make_service())
        .await?;

    Ok(())
}

src/handlers.rs (核心业务逻辑):

use axum::{extract::State, http::StatusCode, Json};
use sea_orm::{ActiveModelTrait, Set};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{error, info};
use uuid::Uuid;

use crate::error::AppError;
use crate::state::AppState;

// 这是一个简化的实体,真实项目中会使用 SeaORM 的 entity generation
mod entities {
    use sea_orm::entity::prelude::*;
    
    #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
    #[sea_orm(table_name = "interaction_events")]
    pub struct Model {
        #[sea_orm(primary_key, auto_increment = false)]
        pub id: Uuid,
        pub user_id: Uuid,
        pub event_type: String,
        pub payload: Json,
        pub created_at: DateTimeUtc,
    }

    #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
    pub enum Relation {}
    impl ActiveModelBehavior for ActiveModel {}
}

#[derive(Deserialize, Debug)]
pub struct RetrainRequest {
    user_id: Uuid,
    event_data: serde_json::Value,
}

#[derive(Serialize, Debug)]
pub struct RetrainResponse {
    message: String,
    tracking_id: Uuid,
}

/// 处理模型再训练请求的 Handler
pub async fn trigger_retraining_handler(
    State(state): State<Arc<AppState>>,
    Json(payload): Json<RetrainRequest>,
) -> Result<(StatusCode, Json<RetrainResponse>), AppError> {
    let tracking_id = Uuid::new_v4();
    info!(
        user_id = %payload.user_id,
        tracking_id = %tracking_id,
        "Received retraining request."
    );

    // 步骤 1: 将触发事件持久化到数据库
    // 这是一个关键步骤,保证了即使后续的 FaaS 调用失败,事件也已经被记录
    let new_event = entities::ActiveModel {
        id: Set(Uuid::new_v4()),
        user_id: Set(payload.user_id),
        event_type: Set("user_feedback_positive".to_string()),
        payload: Set(payload.event_data.into()),
        created_at: Set(chrono::Utc::now()),
    };

    if let Err(db_err) = new_event.insert(&state.db).await {
        error!(error = ?db_err, "Failed to insert interaction event into DB.");
        return Err(AppError::DatabaseError(db_err));
    }
    info!(user_id = %payload.user_id, "Interaction event saved.");

    // 步骤 2: 异步调用 OpenFaaS 函数
    // 注意这里我们使用了 /async-function/ 路径,OpenFaaS会立即返回 202 Accepted
    let faas_url = format!(
        "{}/async-function/{}",
        state.openfaas_gateway_url, state.retrain_function_name
    );
    
    // OpenFaaS 函数的输入就是用户的 ID
    let user_id_str = payload.user_id.to_string();

    let res = state
        .http_client
        .post(&faas_url)
        .body(user_id_str.clone())
        .send()
        .await;

    match res {
        Ok(response) if response.status().is_success() => {
            info!(
                user_id = %payload.user_id,
                status = %response.status(),
                "Successfully dispatched job to OpenFaaS."
            );
            Ok((
                StatusCode::ACCEPTED,
                Json(RetrainResponse {
                    message: "Retraining job accepted.".to_string(),
                    tracking_id,
                }),
            ))
        }
        Ok(response) => {
            let status = response.status();
            let body = response.text().await.unwrap_or_else(|_| "N/A".to_string());
            error!(
                user_id = %payload.user_id,
                status = %status,
                body = %body,
                "Failed to dispatch job to OpenFaaS."
            );
            Err(AppError::FaasDispatchError(format!(
                "OpenFaaS gateway returned status {}",
                status
            )))
        }
        Err(e) => {
            error!(error = ?e, "Network error when calling OpenFaaS gateway.");
            Err(AppError::FaasDispatchError(e.to_string()))
        }
    }
}

这里的错误处理非常重要。我们区分了数据库错误和 FaaS 调用错误。对 FaaS 的调用是异步的,Axum 服务在确认 OpenFaaS 网关接受任务后就立即返回,不阻塞客户端。

计算核心:OpenFaaS 训练函数

这个函数是实际执行模型训练的地方。它需要打包成一个 Docker 镜像并部署到 OpenFaaS。

user-retrainer/handler.py:

import os
import sys
import uuid
import logging
import psycopg2
import mlflow
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

# 配置日志
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# 从环境变量获取配置,这是 Serverless 的最佳实践
MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI")
DB_CONNECT_STRING = os.getenv("DB_CONNECT_STRING")
EXPERIMENT_NAME = "user_personalization_v2"

def handle(req):
    """
    OpenFaaS handler function.
    `req` is the user_id string passed from Axum.
    """
    if not req:
        logging.error("Request body (user_id) is empty.")
        return {"status": "error", "message": "user_id is required"}, 400

    user_id = req.strip()
    logging.info(f"Starting retraining job for user_id: {user_id}")

    try:
        # 步骤 1: 连接 MLflow 和数据库
        mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
        mlflow.set_experiment(EXPERIMENT_NAME)
        
        conn = psycopg2.connect(DB_CONNECT_STRING)
        cursor = conn.cursor()

        # 步骤 2: 拉取该用户的全部交互数据
        cursor.execute("SELECT payload FROM interaction_events WHERE user_id = %s ORDER BY created_at ASC", (user_id,))
        records = cursor.fetchall()
        
        if len(records) < 10: # 训练样本太少,直接跳过
            logging.warning(f"Not enough data to train for user {user_id}. Found {len(records)} records.")
            return {"status": "skipped", "message": "Insufficient data"}, 200

        # 步骤 3: 数据预处理和特征工程 (简化示例)
        # 假设 payload 是 {'feature': [0.1, 0.2, ...], 'label': 1}
        features = [r[0]['feature'] for r in records]
        labels = [r[0]['label'] for r in records]
        
        X = np.array(features)
        y = np.array(labels)

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # 步骤 4: 使用 MLflow 追踪训练过程
        with mlflow.start_run(run_name=f"user_{user_id}_{uuid.uuid4().hex[:8]}") as run:
            run_id = run.info.run_uuid
            logging.info(f"Started MLflow run: {run_id} for user {user_id}")
            
            mlflow.set_tag("user_id", user_id)
            mlflow.set_tag("training_trigger", "api_call")

            # 训练一个简单的模型
            params = {"C": 1.0, "solver": "liblinear"}
            model = LogisticRegression(**params)
            model.fit(X_train, y_train)

            # 评估模型
            accuracy = model.score(X_test, y_test)
            logging.info(f"Model accuracy for user {user_id}: {accuracy}")

            # 记录参数、指标和模型文件
            mlflow.log_params(params)
            mlflow.log_metric("accuracy", accuracy)
            mlflow.sklearn.log_model(model, "model")

            # 步骤 5: 更新数据库中的 user_models 表,指向新训练好的模型 run_id
            # 这是一个 "UPSERT" 操作
            update_query = """
                INSERT INTO user_models (user_id, active_run_id, updated_at)
                VALUES (%s, %s, NOW())
                ON CONFLICT (user_id) DO UPDATE
                SET active_run_id = EXCLUDED.active_run_id, updated_at = NOW();
            """
            cursor.execute(update_query, (user_id, run_id))
            conn.commit()
            logging.info(f"Updated user_models table for user {user_id} with new run_id {run_id}")

    except Exception as e:
        logging.error(f"An error occurred during training for user {user_id}: {e}", exc_info=True)
        # 异常处理:即使失败,函数也应该正常退出,避免 OpenFaaS 无限重试
        return {"status": "error", "message": str(e)}, 500
    finally:
        if 'conn' in locals() and conn:
            cursor.close()
            conn.close()

    return {"status": "success", "run_id": run_id, "accuracy": accuracy}, 200

user-retrainer/requirements.txt:

mlflow>=2.0.0
psycopg2-binary
scikit-learn
pandas
numpy

user-retrainer.yml (OpenFaaS Stack File):

version: 1.0
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080
functions:
  user-retrainer:
    lang: python3-debian
    handler: ./user-retrainer
    image: your-docker-hub-username/user-retrainer:0.1.0
    environment:
      # 在部署时,你需要将这些值通过 OpenFaaS secrets 注入
      MLFLOW_TRACKING_URI: http://<your-mlflow-server-ip>:5000
      DB_CONNECT_STRING: "dbname='personadb' user='youruser' host='<your-db-ip>' password='yourpass'"
    labels:
      com.openfaas.scale.min: "1"
      com.openfaas.scale.max: "10"
    limits:
      memory: "1Gi"
      cpu: "500m"
    requests:
      memory: "256Mi"
      cpu: "100m"

这个配置文件定义了函数的运行时、依赖、环境变量和资源限制。在生产环境中,严禁将数据库密码等敏感信息硬编码在 environment 中,必须使用 OpenFaaS secrets。

客户端触发逻辑 (iOS - Swift)

最后,iOS 客户端需要一个简单的方式来调用我们的 Axum 服务。

import Foundation

struct RetrainPayload: Codable {
    let userId: UUID
    let eventData: EventData

    // `EventData` 可以是任何你需要的 Codable 结构
    struct EventData: Codable {
        let feature: [Double]
        let label: Int
    }
}

class ModelPersonalizationService {
    
    private let apiEndpoint = URL(string: "http://<your-axum-service-ip>:8000/v1/retrain")!

    func triggerRetraining(for userId: UUID, with eventData: RetrainPayload.EventData) {
        let payload = RetrainPayload(userId: userId, eventData: eventData)
        
        var request = URLRequest(url: apiEndpoint)
        request.httpMethod = "POST"
        request.setValue("application/json", forHTTPHeaderField: "Content-Type")
        
        do {
            request.httpBody = try JSONEncoder().encode(payload)
        } catch {
            print("Error encoding payload: \(error)")
            return
        }
        
        // 这里的网络调用应该在一个后台线程中执行
        // 并且应该有更完善的错误处理逻辑
        let task = URLSession.shared.dataTask(with: request) { data, response, error in
            if let error = error {
                print("Network request failed: \(error)")
                return
            }
            
            guard let httpResponse = response as? HTTPURLResponse else {
                print("Invalid response")
                return
            }
            
            // Axum 服务会返回 202 Accepted
            if httpResponse.statusCode == 202 {
                print("Retraining job successfully accepted by the server.")
            } else {
                print("Server returned non-202 status code: \(httpResponse.statusCode)")
                if let data = data, let responseBody = String(data: data, encoding: .utf8) {
                    print("Response body: \(responseBody)")
                }
            }
        }
        task.resume()
    }
}

// 使用示例:
// let service = ModelPersonalizationService()
// let currentUserId = UUID() // 从用户会话中获取
// let userFeedback = RetrainPayload.EventData(feature: [0.1, 0.9, 0.4], label: 1)
// service.triggerRetraining(for: currentUserId, with: userFeedback)

这段 Swift 代码展示了如何构建请求体并异步地向 Axum 端点发送 POST 请求。在真实的应用中,这个调用会被嵌入到用户完成某个关键操作(如收藏、购买、点赞)的逻辑中。

局限性与未来迭代路径

这个架构虽然解决了核心问题,但在生产环境中仍有几个需要考虑的方面。

首先,OpenFaaS 的冷启动延迟是不可避免的。对于需要极低延迟响应的场景,可能需要配置函数的最小实例数(min scale > 0),但这会牺牲部分成本优势。

其次,训练函数的资源限制是一个硬约束。对于需要大量内存或长时间运行的复杂模型(如深度学习模型),Serverless 可能不是最佳选择。这种情况下,可以考虑将 OpenFaaS 替换为 KubeFlow 或 Argo Workflows,由 Axum 服务来动态创建和提交一个 Kubernetes Job。

再者,安全性。当前架构中,Axum 到 OpenFaaS、OpenFaaS 到 MLflow/DB 的通信都发生在内部网络,但仍应启用 mTLS 加密。对外的 Axum 端点需要加入认证和授权机制,确保只有合法的客户端才能触发训练。

最后,模型部署。本文只解决了模型的按需训练和追踪,但没有涉及训练出的新模型如何被 iOS 客户端消费。一个完整的闭环需要一个模型服务组件(如 MLflow Model Serving, Seldon Core),iOS 客户端可以定期或在收到推送通知后,从模型服务拉取最新的、属于自己的个性化模型。这是整个系统下一步迭代的关键方向。


  目录