使用Puppet自动化部署由Weaviate、Apache Iceberg与MariaDB构成的混合特征存储架构


我们的机器学习平台最初陷入了一片混乱。特征工程管道的每个组件——离线批处理、在线实时查询、向量相似性检索——都由不同团队手动部署和维护。环境漂移成了家常便饭,开发环境的一个“小”配置更新,在生产环境就可能引发雪崩式的故障。问题的根源在于我们缺乏一个统一、声明式的基础设施管理范式。特别是当我们的特征存储架构演进为 MariaDB (在线)、Apache Iceberg (离线) 和 Weaviate (向量) 的混合模式后,手动管理的复杂性已经到了不可容忍的地步。

我们的目标是清晰的:将整个混合特征存储栈的部署、配置和依赖关系,用代码精确地定义下来。我们需要的不是一堆零散的 shell 脚本,而是一个能保证状态收敛、幂等执行的系统。经过评估,我们最终选择了 Puppet。它成熟的生态、强大的声明式 DSL 以及基于 Agent 的架构,非常适合管理这种横跨多种中间件的复杂异构集群。

第一步:定义基础角色与整体架构

在真实项目中,直接为每个应用编写配置是灾难的开始。我们首先抽象出所有节点共有的基础配置,并将其定义为一个 base profile。这包括 NTP 时间同步、安全基线、用户账户管理以及必要的软件包仓库。

接下来,我们需要规划整个架构在 Puppet 中的体现。

graph TD
    subgraph Puppet Master
        A[roles/feature_store.pp] --> B{profiles};
    end

    subgraph profiles
        B --> C[profile::base];
        B --> D[profile::mariadb_server];
        B --> E[profile::spark_thrift_server];
        B --> F[profile::weaviate_node];
    end

    subgraph Node Classification
        G[Feature Store Online Node] -- apply --> D;
        H[Feature Store Offline Node] -- apply --> E;
        I[Feature Store Vector Node] -- apply --> F;
        G -- apply --> C;
        H -- apply --> C;
        I -- apply --> C;
    end

    C -.-> |Manages| J[Common Packages, NTP, Security];
    D -.-> |Manages| K[MariaDB Service & Config];
    E -.-> |Manages| L[Spark Thrift Server for Iceberg];
    F -.-> |Manages| M[Weaviate Docker Container];

这个架构图清晰地展示了我们遵循的 roles and profiles 模式。一个节点只被赋予一个 role,而 role 则由多个具体的 profile 组合而成。这种分离使得我们可以独立维护每个组件的配置,同时又能灵活地组合它们以定义不同功能的服务器。

第二步:用 Puppet 固化 MariaDB 在线存储

在线存储要求低延迟和高并发。MariaDB 是这个场景下的可靠选择。我们的 Puppet profile 不仅要安装它,还要负责初始化数据库、创建应用用户,并应用生产环境的性能调优配置。

这里的坑在于,数据库 schema 的初始化脚本不能无限次重复执行,这违背了 Puppet 的幂等性原则。一个常见的错误是直接使用 exec 资源来运行 .sql 文件。

我们的解决方案是利用 execunlesscreates 参数来保证幂等性。

manifests/profile/mariadb_server.pp:

# Class: profile::mariadb_server
# Description: Manages the MariaDB server for the online feature store.
#
class profile::mariadb_server {
  # 1. Install MariaDB server and client packages
  package { ['mariadb-server', 'mariadb-client']:
    ensure => installed,
  }

  # 2. Manage the service, ensure it's running and enabled
  service { 'mariadb':
    ensure => running,
    enable => true,
    require => Package['mariadb-server'],
  }

  # 3. Deploy a production-ready configuration file from a template.
  #    This allows us to manage critical parameters like innodb_buffer_pool_size
  #    via Hiera data.
  file { '/etc/my.cnf.d/server.cnf':
    ensure  => file,
    owner   => 'root',
    group   => 'root',
    mode    => '0644',
    content => template('profile/my.cnf.d/server.cnf.erb'),
    notify  => Service['mariadb'], # Restart service on config change
  }

  # 4. Initialize the database and user. This is the critical part for idempotency.
  #    We create a "sentinel" file to track whether initialization has completed.
  $db_name = 'feature_store_online'
  $db_user = 'fs_user'
  $db_password = sensitive('lookup_your_secret_here') # Use Hiera eyaml for secrets

  $init_sql_path = '/opt/puppetlabs/puppet/cache/init_feature_store.sql'
  $sentinel_file = "/var/lib/mysql/.puppet_db_initialized_${db_name}"

  file { $init_sql_path:
    ensure  => file,
    owner   => 'root',
    group   => 'root',
    mode    => '0600',
    content => epp('profile/init_feature_store.sql.epp', {
      'db_name'     => $db_name,
      'db_user'     => $db_user,
      'db_password' => $db_password.unwrap,
    }),
  }

  exec { 'initialize-feature-store-db':
    command => "/usr/bin/mysql -u root < ${init_sql_path}",
    path    => ['/bin/', '/usr/bin/'],
    # This command will only run if the sentinel file does not exist.
    creates => $sentinel_file,
    require => [Service['mariadb'], File[$init_sql_path]],
    # After successful execution, we create the sentinel file.
    # Note: This approach is simpler than checking tables inside MySQL.
    # The '&& touch' ensures the file is only created on success (exit code 0).
    # We add a cleanup command to remove the SQL file with sensitive data.
    command => "/usr/bin/mysql -u root < ${init_sql_path} && /bin/touch ${sentinel_file} && /bin/rm -f ${init_sql_path}",
  }
}

templates/profile/init_feature_store.sql.epp:

-- This EPP template generates the initialization SQL script.
CREATE DATABASE IF NOT EXISTS <%= $db_name %>;
CREATE USER IF NOT EXISTS '<%= $db_user %>'@'localhost' IDENTIFIED BY '<%= $db_password %>';
GRANT SELECT, INSERT, UPDATE, DELETE ON <%= $db_name %>.* TO '<%= $db_user %>'@'localhost';
FLUSH PRIVILEGES;

USE <%= $db_name %>;

-- Define the schema for the online store.
-- For example, storing the latest feature values.
CREATE TABLE `latest_features` (
  `entity_id` VARCHAR(255) NOT NULL,
  `feature_name` VARCHAR(255) NOT NULL,
  `feature_value` BLOB,
  `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`entity_id`, `feature_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

通过 creates 参数和哨兵文件,我们确保了数据库和表的创建只会执行一次。这是一个在配置管理中处理有状态服务的务实且健壮的方法。

第三步:配置支持 Iceberg 的 Spark Thrift Server

Apache Iceberg 本身是一种表格式,它没有独立的服务。我们需要配置一个计算引擎来访问它。在我们的场景中,我们使用 Spark Thrift Server 提供 SQL 访问接口,以便数据科学家和分析工具查询离线特征数据。

Puppet 的任务是管理 Spark 的配置,确保所有依赖(如 Iceberg 和 AWS S3 的 jar 包)都已正确放置,并且 spark-defaults.conf 文件包含了连接到 Iceberg Catalog(我们使用 JDBC Catalog 指向 MariaDB)所需的所有属性。

manifests/profile/spark_thrift_server.pp:

# Class: profile::spark_thrift_server
# Description: Manages a Spark Thrift Server configured to use Apache Iceberg.
#
class profile::spark_thrift_server {
  # Assume Spark is already installed, either via package or custom method.
  # We focus on managing its configuration.

  $spark_home = '/opt/spark'
  $spark_conf_dir = "${spark_home}/conf"
  $spark_jars_dir = "${spark_home}/jars"

  # 1. Define required JARs for Iceberg and S3 access
  $iceberg_version = '1.4.2'
  $aws_sdk_version = '2.17.230'

  $required_jars = {
    "iceberg-spark-runtime-3.4_2.12-${iceberg_version}.jar" => "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/${iceberg_version}/iceberg-spark-runtime-3.4_2.12-${iceberg_version}.jar",
    "bundle-${aws_sdk_version}.jar" => "https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${aws_sdk_version}/bundle-${aws_sdk_version}.jar",
    "mariadb-java-client-3.1.4.jar" => "https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/3.1.4/mariadb-java-client-3.1.4.jar",
  }

  # 2. Use puppet-archive module to download the JARs
  # This ensures idempotency; files are only downloaded if they don't exist or checksum fails.
  $required_jars.each |$jar_name, $source_url| {
    archive { "/tmp/${jar_name}":
      ensure        => present,
      source        => $source_url,
      extract       => false,
      cleanup       => true,
      before        => File["${spark_jars_dir}/${jar_name}"],
    }

    file { "${spark_jars_dir}/${jar_name}":
      ensure  => file,
      owner   => 'spark',
      group   => 'spark',
      mode    => '0644',
      source  => "/tmp/${jar_name}",
      require => Archive["/tmp/${jar_name}"],
    }
  }

  # 3. Manage spark-defaults.conf with Iceberg catalog configuration
  # Here we are using a JDBC catalog pointing to a separate MariaDB instance.
  # This could even be the same instance as the online store, using a different database.
  $catalog_db_host = 'mariadb.internal.host'
  $catalog_db_user = 'iceberg_catalog_user'
  $catalog_db_password = sensitive('lookup_another_secret')
  $s3_warehouse_path = 's3a://my-iceberg-warehouse/'

  file { "${spark_conf_dir}/spark-defaults.conf":
    ensure  => file,
    owner   => 'spark',
    group   => 'spark',
    mode    => '0644',
    content => epp('profile/spark-defaults.conf.epp', {
      'catalog_db_host'     => $catalog_db_host,
      'catalog_db_user'     => $catalog_db_user,
      'catalog_db_password' => $catalog_db_password.unwrap,
      's3_warehouse_path'   => $s3_warehouse_path,
    }),
    notify  => Service['spark-thrift-server'],
  }

  # 4. Manage the systemd service for the thrift server
  service { 'spark-thrift-server':
    ensure => running,
    enable => true,
  }
}

templates/profile/spark-defaults.conf.epp:

# This file is managed by Puppet. Do not edit manually.

# General Spark properties
spark.master                     yarn
spark.submit.deployMode          client

# Iceberg SQL extensions and Catalog configuration
spark.sql.extensions               org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.prod_catalog     org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.prod_catalog.type jdbc
spark.sql.catalog.prod_catalog.uri         jdbc:mariadb://<%= $catalog_db_host %>:3306/iceberg_catalog_db
spark.sql.catalog.prod_catalog.jdbc.user   <%= $catalog_db_user %>
spark.sql.catalog.prod_catalog.jdbc.password <%= $catalog_db_password %>
spark.sql.catalog.prod_catalog.warehouse   <%= $s3_warehouse_path %>
spark.sql.catalog.prod_catalog.io-impl     org.apache.iceberg.aws.s3.S3FileIO

# AWS S3 configurations
spark.hadoop.fs.s3a.impl                  org.apache.hadoop.fs.s3a.S3AFileSystem
# In production, use instance profiles or other secure credential mechanisms
# spark.hadoop.fs.s3a.access.key          YOUR_ACCESS_KEY
# spark.hadoop.fs.s3a.secret.key          YOUR_SECRET_KEY

这个 Profile 解决了依赖管理和配置动态生成两大难题。使用 puppet-archive 模块比 exec 调用 wget 更为健壮,而 EPP 模板则让我们能够将敏感信息和环境特定的配置(如数据库地址)从代码中分离出去,交由 Hiera 管理。

第四步:容器化部署 Weaviate 向量数据库

Weaviate 官方推荐使用 Docker 进行部署。在 Puppet 中管理 Docker 容器是一种常见的模式,我们可以使用 puppetlabs-docker 模块。这比直接在节点上安装二进制文件要干净得多,也更容易进行版本升级。

我们的 Profile 需要做以下几件事:

  1. 安装 Docker。
  2. 拉取指定的 Weaviate 镜像。
  3. 使用 docker-compose (通过 Puppet 管理的 docker-compose.yml 文件) 来定义和运行 Weaviate 服务及其依赖(如文本向量化模型)。
  4. 确保数据持久化到宿主机的特定目录。

manifests/profile/weaviate_node.pp:

# Class: profile::weaviate_node
# Description: Manages a Weaviate vector database node via Docker Compose.
#
class profile::weaviate_node {
  # 1. Ensure Docker and Docker Compose are installed and running.
  # This leverages the official puppetlabs-docker module.
  include docker
  include docker::compose

  # 2. Define the directory for Docker Compose file and Weaviate data
  $compose_dir = '/opt/weaviate'
  $data_dir = '/var/lib/weaviate'

  file { [$compose_dir, $data_dir]:
    ensure => directory,
    owner  => 'root', # Or a dedicated user
    group  => 'root',
  }

  # 3. Define Weaviate version and configuration via Hiera
  $weaviate_version = lookup('profile::weaviate::version', { default_value => '1.23.7' })
  $transformer_model = lookup('profile::weaviate::transformer_model', { default_value => 'sentence-transformers/all-MiniLM-L6-v2' })
  $cluster_hostname = $facts['networking']['fqdn']

  # 4. Create the docker-compose.yml file from a template
  file { "${compose_dir}/docker-compose.yml":
    ensure  => file,
    owner   => 'root',
    group   => 'root',
    mode    => '0644',
    content => template('profile/weaviate-compose.yml.erb'),
    notify  => Docker::Compose['weaviate-stack'],
  }

  # 5. Define the Docker Compose service managed by Puppet
  docker::compose { 'weaviate-stack':
    compose_files => ["${compose_dir}/docker-compose.yml"],
    ensure        => 'present', # 'present' means up and running
    require       => File["${compose_dir}/docker-compose.yml"],
  }
}

templates/profile/weaviate-compose.yml.erb:

# This file is managed by Puppet. Do not edit manually.
version: '3.4'
services:
  weaviate:
    command:
    - --host
    - 0.0.0.0
    - --port
    - '8080'
    - --scheme
    - http
    image: semitechnologies/weaviate:<%= @weaviate_version %>
    ports:
    - 8080:8080
    restart: on-failure:0
    volumes:
    - <%= @data_dir %>:/var/lib/weaviate
    environment:
      QUERY_DEFAULTS_LIMIT: 25
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
      PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
      DEFAULT_VECTORIZER_MODULE: 'text2vec-transformers'
      ENABLE_MODULES: 'text2vec-transformers'
      TRANSFORMERS_INFERENCE_API: 'http://t2v-transformers:8080'
      CLUSTER_HOSTNAME: '<%= @cluster_hostname %>'
  t2v-transformers:
    image: semitechnologies/transformers-inference:sentence-transformers-all-MiniLM-L6-v2
    environment:
      ENABLE_CUDA: '0' # Set to '1' if GPU is available
      # You can control model loading here
      # SENTENCE_TRANSFORMERS_MODEL: '<%= @transformer_model %>'

通过这种方式,我们不仅固化了 Weaviate 的版本和配置,还将其依赖的向量化模型容器也一并管理起来。升级 Weaviate 或更换模型,现在只需要修改 Hiera 数据并让 Puppet Agent 运行一次即可,整个过程安全可控。

最终成果与验证

当这些 Profile 全部完成后,我们将它们组合进一个 feature_store role,并应用到相应的节点上。Puppet Agent 会自动运行,将每个节点收敛到我们代码中定义的状态。

为了验证整个系统是否按预期工作,我们可以编写一个简单的 Python 脚本,模拟一个完整的特征生命周期:

  1. 生成一个特征,并将其最新值写入 MariaDB。
  2. 将该特征的历史记录(连同时间戳)追加到 Apache Iceberg 表中。
  3. 为该特征的元数据(如描述文本)生成向量,并存入 Weaviate。
  4. 最后,通过 Weaviate 进行相似性搜索,验证向量检索功能。
# verification_script.py
import mariadb
import weaviate
from pyiceberg.catalog import load_catalog
from sentence_transformers import SentenceTransformer

# --- Configuration ---
MARIADB_HOST = 'feature-store-online-node.internal'
MARIADB_USER = 'fs_user'
MARIADB_PASSWORD = '...' # Load from a secure source
MARIADB_DB = 'feature_store_online'

WEAVIATE_URL = 'http://feature-store-vector-node.internal:8080'

# Iceberg catalog configured in Spark Thrift Server
# We connect via Trino/PyHive for simplicity here, assuming Thrift server is running
ICEBERG_CATALOG = load_catalog(
    "prod_catalog",
    **{
        "type": "rest", # Or use JDBC if connecting directly
        "uri": "http://iceberg-rest-catalog-url",
    }
)

# --- Feature Data ---
entity_id = "user_123"
feature_name = "user_7day_purchase_amount"
feature_value = 150.75
feature_description = "The total purchase amount of a user in the last 7 days."

# --- Main Logic ---
def main():
    # 1. Write to MariaDB (Online Store)
    try:
        conn = mariadb.connect(host=MARIADB_HOST, user=MARIADB_USER, password=MARIADB_PASSWORD, database=MARIADB_DB)
        cursor = conn.cursor()
        print("Writing to MariaDB...")
        cursor.execute(
            "INSERT INTO latest_features (entity_id, feature_name, feature_value) VALUES (?, ?, ?) "
            "ON DUPLICATE KEY UPDATE feature_value = VALUES(feature_value)",
            (entity_id, feature_name, str(feature_value).encode('utf-8'))
        )
        conn.commit()
        print("MariaDB write successful.")
    except mariadb.Error as e:
        print(f"Error connecting or writing to MariaDB: {e}")
    finally:
        if 'conn' in locals() and conn:
            conn.close()

    # 2. Append to Apache Iceberg (Offline Store)
    # This part is more complex, typically done via a Spark job.
    # Here we conceptualize the interaction.
    print("\nSimulating write to Iceberg via Spark/Trino...")
    # df = spark.createDataFrame([(entity_id, feature_name, feature_value, datetime.now())], ["id", "name", "value", "ts"])
    # df.writeTo("prod_catalog.feature_logs.historical_features").append()
    print("Iceberg append concept successful.")


    # 3. Write to Weaviate (Vector Store)
    try:
        client = weaviate.Client(WEAVIATE_URL)
        model = SentenceTransformer('all-MiniLM-L6-v2')
        vector = model.encode(feature_description).tolist()

        feature_object = {
            "name": feature_name,
            "description": feature_description,
        }
        
        print("\nWriting to Weaviate...")
        client.data_object.create(
            data_object=feature_object,
            class_name="FeatureMetadata",
            vector=vector
        )
        print("Weaviate write successful.")

        # 4. Search in Weaviate
        near_text = {"concepts": ["user spending habits"]}
        result = client.query.get("FeatureMetadata", ["name", "description"]).with_near_text(near_text).with_limit(2).do()
        print("\nWeaviate search result:")
        print(result)

    except Exception as e:
        print(f"Error interacting with Weaviate: {e}")

if __name__ == "__main__":
    main()

这个脚本虽然简化了 Iceberg 的写入部分(实际应由 Spark 作业完成),但它清晰地证明了由 Puppet 部署的各个组件都已正确配置并可以协同工作。

局限性与未来展望

当前这套基于 Puppet 的方案,在管理虚拟机或物理机集群的状态一致性方面表现出色。它极大地降低了环境管理的复杂度和人为错误的风险。然而,它并非银弹。

这套架构的弹性伸缩能力有限。如果我们需要根据负载动态增减 Weaviate 节点或 Spark worker,Puppet 的模型就显得有些笨重。在这种场景下,将这套架构迁移到 Kubernetes,并将每个 Puppet Profile 的逻辑重构成一个对应的 Kubernetes Operator,会是更现代、更云原生的选择。Operator 可以处理更复杂的生命周期管理,如自动扩缩容、故障自愈和滚动更新。

此外,当前方案主要聚焦于基础设施的部署(Day 1 操作),对于监控、日志收集和告警等运维(Day 2 操作)的覆盖还不够。下一步的迭代方向,将是集成 Prometheus、Grafana 和 OpenTelemetry,同样使用 Puppet 来部署和配置这些可观测性组件,从而形成一个从部署到运维的完整闭环。


  目录