我们的机器学习平台最初陷入了一片混乱。特征工程管道的每个组件——离线批处理、在线实时查询、向量相似性检索——都由不同团队手动部署和维护。环境漂移成了家常便饭,开发环境的一个“小”配置更新,在生产环境就可能引发雪崩式的故障。问题的根源在于我们缺乏一个统一、声明式的基础设施管理范式。特别是当我们的特征存储架构演进为 MariaDB (在线)、Apache Iceberg (离线) 和 Weaviate (向量) 的混合模式后,手动管理的复杂性已经到了不可容忍的地步。
我们的目标是清晰的:将整个混合特征存储栈的部署、配置和依赖关系,用代码精确地定义下来。我们需要的不是一堆零散的 shell 脚本,而是一个能保证状态收敛、幂等执行的系统。经过评估,我们最终选择了 Puppet。它成熟的生态、强大的声明式 DSL 以及基于 Agent 的架构,非常适合管理这种横跨多种中间件的复杂异构集群。
第一步:定义基础角色与整体架构
在真实项目中,直接为每个应用编写配置是灾难的开始。我们首先抽象出所有节点共有的基础配置,并将其定义为一个 base
profile。这包括 NTP 时间同步、安全基线、用户账户管理以及必要的软件包仓库。
接下来,我们需要规划整个架构在 Puppet 中的体现。
graph TD subgraph Puppet Master A[roles/feature_store.pp] --> B{profiles}; end subgraph profiles B --> C[profile::base]; B --> D[profile::mariadb_server]; B --> E[profile::spark_thrift_server]; B --> F[profile::weaviate_node]; end subgraph Node Classification G[Feature Store Online Node] -- apply --> D; H[Feature Store Offline Node] -- apply --> E; I[Feature Store Vector Node] -- apply --> F; G -- apply --> C; H -- apply --> C; I -- apply --> C; end C -.-> |Manages| J[Common Packages, NTP, Security]; D -.-> |Manages| K[MariaDB Service & Config]; E -.-> |Manages| L[Spark Thrift Server for Iceberg]; F -.-> |Manages| M[Weaviate Docker Container];
这个架构图清晰地展示了我们遵循的 roles
and profiles
模式。一个节点只被赋予一个 role
,而 role
则由多个具体的 profile
组合而成。这种分离使得我们可以独立维护每个组件的配置,同时又能灵活地组合它们以定义不同功能的服务器。
第二步:用 Puppet 固化 MariaDB 在线存储
在线存储要求低延迟和高并发。MariaDB 是这个场景下的可靠选择。我们的 Puppet profile 不仅要安装它,还要负责初始化数据库、创建应用用户,并应用生产环境的性能调优配置。
这里的坑在于,数据库 schema 的初始化脚本不能无限次重复执行,这违背了 Puppet 的幂等性原则。一个常见的错误是直接使用 exec
资源来运行 .sql
文件。
我们的解决方案是利用 exec
的 unless
或 creates
参数来保证幂等性。
manifests/profile/mariadb_server.pp
:
# Class: profile::mariadb_server
# Description: Manages the MariaDB server for the online feature store.
#
class profile::mariadb_server {
# 1. Install MariaDB server and client packages
package { ['mariadb-server', 'mariadb-client']:
ensure => installed,
}
# 2. Manage the service, ensure it's running and enabled
service { 'mariadb':
ensure => running,
enable => true,
require => Package['mariadb-server'],
}
# 3. Deploy a production-ready configuration file from a template.
# This allows us to manage critical parameters like innodb_buffer_pool_size
# via Hiera data.
file { '/etc/my.cnf.d/server.cnf':
ensure => file,
owner => 'root',
group => 'root',
mode => '0644',
content => template('profile/my.cnf.d/server.cnf.erb'),
notify => Service['mariadb'], # Restart service on config change
}
# 4. Initialize the database and user. This is the critical part for idempotency.
# We create a "sentinel" file to track whether initialization has completed.
$db_name = 'feature_store_online'
$db_user = 'fs_user'
$db_password = sensitive('lookup_your_secret_here') # Use Hiera eyaml for secrets
$init_sql_path = '/opt/puppetlabs/puppet/cache/init_feature_store.sql'
$sentinel_file = "/var/lib/mysql/.puppet_db_initialized_${db_name}"
file { $init_sql_path:
ensure => file,
owner => 'root',
group => 'root',
mode => '0600',
content => epp('profile/init_feature_store.sql.epp', {
'db_name' => $db_name,
'db_user' => $db_user,
'db_password' => $db_password.unwrap,
}),
}
exec { 'initialize-feature-store-db':
command => "/usr/bin/mysql -u root < ${init_sql_path}",
path => ['/bin/', '/usr/bin/'],
# This command will only run if the sentinel file does not exist.
creates => $sentinel_file,
require => [Service['mariadb'], File[$init_sql_path]],
# After successful execution, we create the sentinel file.
# Note: This approach is simpler than checking tables inside MySQL.
# The '&& touch' ensures the file is only created on success (exit code 0).
# We add a cleanup command to remove the SQL file with sensitive data.
command => "/usr/bin/mysql -u root < ${init_sql_path} && /bin/touch ${sentinel_file} && /bin/rm -f ${init_sql_path}",
}
}
templates/profile/init_feature_store.sql.epp
:
-- This EPP template generates the initialization SQL script.
CREATE DATABASE IF NOT EXISTS <%= $db_name %>;
CREATE USER IF NOT EXISTS '<%= $db_user %>'@'localhost' IDENTIFIED BY '<%= $db_password %>';
GRANT SELECT, INSERT, UPDATE, DELETE ON <%= $db_name %>.* TO '<%= $db_user %>'@'localhost';
FLUSH PRIVILEGES;
USE <%= $db_name %>;
-- Define the schema for the online store.
-- For example, storing the latest feature values.
CREATE TABLE `latest_features` (
`entity_id` VARCHAR(255) NOT NULL,
`feature_name` VARCHAR(255) NOT NULL,
`feature_value` BLOB,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`entity_id`, `feature_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
通过 creates
参数和哨兵文件,我们确保了数据库和表的创建只会执行一次。这是一个在配置管理中处理有状态服务的务实且健壮的方法。
第三步:配置支持 Iceberg 的 Spark Thrift Server
Apache Iceberg 本身是一种表格式,它没有独立的服务。我们需要配置一个计算引擎来访问它。在我们的场景中,我们使用 Spark Thrift Server 提供 SQL 访问接口,以便数据科学家和分析工具查询离线特征数据。
Puppet 的任务是管理 Spark 的配置,确保所有依赖(如 Iceberg 和 AWS S3 的 jar 包)都已正确放置,并且 spark-defaults.conf
文件包含了连接到 Iceberg Catalog(我们使用 JDBC Catalog 指向 MariaDB)所需的所有属性。
manifests/profile/spark_thrift_server.pp
:
# Class: profile::spark_thrift_server
# Description: Manages a Spark Thrift Server configured to use Apache Iceberg.
#
class profile::spark_thrift_server {
# Assume Spark is already installed, either via package or custom method.
# We focus on managing its configuration.
$spark_home = '/opt/spark'
$spark_conf_dir = "${spark_home}/conf"
$spark_jars_dir = "${spark_home}/jars"
# 1. Define required JARs for Iceberg and S3 access
$iceberg_version = '1.4.2'
$aws_sdk_version = '2.17.230'
$required_jars = {
"iceberg-spark-runtime-3.4_2.12-${iceberg_version}.jar" => "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/${iceberg_version}/iceberg-spark-runtime-3.4_2.12-${iceberg_version}.jar",
"bundle-${aws_sdk_version}.jar" => "https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${aws_sdk_version}/bundle-${aws_sdk_version}.jar",
"mariadb-java-client-3.1.4.jar" => "https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/3.1.4/mariadb-java-client-3.1.4.jar",
}
# 2. Use puppet-archive module to download the JARs
# This ensures idempotency; files are only downloaded if they don't exist or checksum fails.
$required_jars.each |$jar_name, $source_url| {
archive { "/tmp/${jar_name}":
ensure => present,
source => $source_url,
extract => false,
cleanup => true,
before => File["${spark_jars_dir}/${jar_name}"],
}
file { "${spark_jars_dir}/${jar_name}":
ensure => file,
owner => 'spark',
group => 'spark',
mode => '0644',
source => "/tmp/${jar_name}",
require => Archive["/tmp/${jar_name}"],
}
}
# 3. Manage spark-defaults.conf with Iceberg catalog configuration
# Here we are using a JDBC catalog pointing to a separate MariaDB instance.
# This could even be the same instance as the online store, using a different database.
$catalog_db_host = 'mariadb.internal.host'
$catalog_db_user = 'iceberg_catalog_user'
$catalog_db_password = sensitive('lookup_another_secret')
$s3_warehouse_path = 's3a://my-iceberg-warehouse/'
file { "${spark_conf_dir}/spark-defaults.conf":
ensure => file,
owner => 'spark',
group => 'spark',
mode => '0644',
content => epp('profile/spark-defaults.conf.epp', {
'catalog_db_host' => $catalog_db_host,
'catalog_db_user' => $catalog_db_user,
'catalog_db_password' => $catalog_db_password.unwrap,
's3_warehouse_path' => $s3_warehouse_path,
}),
notify => Service['spark-thrift-server'],
}
# 4. Manage the systemd service for the thrift server
service { 'spark-thrift-server':
ensure => running,
enable => true,
}
}
templates/profile/spark-defaults.conf.epp
:
# This file is managed by Puppet. Do not edit manually.
# General Spark properties
spark.master yarn
spark.submit.deployMode client
# Iceberg SQL extensions and Catalog configuration
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.prod_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.prod_catalog.type jdbc
spark.sql.catalog.prod_catalog.uri jdbc:mariadb://<%= $catalog_db_host %>:3306/iceberg_catalog_db
spark.sql.catalog.prod_catalog.jdbc.user <%= $catalog_db_user %>
spark.sql.catalog.prod_catalog.jdbc.password <%= $catalog_db_password %>
spark.sql.catalog.prod_catalog.warehouse <%= $s3_warehouse_path %>
spark.sql.catalog.prod_catalog.io-impl org.apache.iceberg.aws.s3.S3FileIO
# AWS S3 configurations
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
# In production, use instance profiles or other secure credential mechanisms
# spark.hadoop.fs.s3a.access.key YOUR_ACCESS_KEY
# spark.hadoop.fs.s3a.secret.key YOUR_SECRET_KEY
这个 Profile 解决了依赖管理和配置动态生成两大难题。使用 puppet-archive
模块比 exec
调用 wget
更为健壮,而 EPP 模板则让我们能够将敏感信息和环境特定的配置(如数据库地址)从代码中分离出去,交由 Hiera 管理。
第四步:容器化部署 Weaviate 向量数据库
Weaviate 官方推荐使用 Docker 进行部署。在 Puppet 中管理 Docker 容器是一种常见的模式,我们可以使用 puppetlabs-docker
模块。这比直接在节点上安装二进制文件要干净得多,也更容易进行版本升级。
我们的 Profile 需要做以下几件事:
- 安装 Docker。
- 拉取指定的 Weaviate 镜像。
- 使用
docker-compose
(通过 Puppet 管理的docker-compose.yml
文件) 来定义和运行 Weaviate 服务及其依赖(如文本向量化模型)。 - 确保数据持久化到宿主机的特定目录。
manifests/profile/weaviate_node.pp
:
# Class: profile::weaviate_node
# Description: Manages a Weaviate vector database node via Docker Compose.
#
class profile::weaviate_node {
# 1. Ensure Docker and Docker Compose are installed and running.
# This leverages the official puppetlabs-docker module.
include docker
include docker::compose
# 2. Define the directory for Docker Compose file and Weaviate data
$compose_dir = '/opt/weaviate'
$data_dir = '/var/lib/weaviate'
file { [$compose_dir, $data_dir]:
ensure => directory,
owner => 'root', # Or a dedicated user
group => 'root',
}
# 3. Define Weaviate version and configuration via Hiera
$weaviate_version = lookup('profile::weaviate::version', { default_value => '1.23.7' })
$transformer_model = lookup('profile::weaviate::transformer_model', { default_value => 'sentence-transformers/all-MiniLM-L6-v2' })
$cluster_hostname = $facts['networking']['fqdn']
# 4. Create the docker-compose.yml file from a template
file { "${compose_dir}/docker-compose.yml":
ensure => file,
owner => 'root',
group => 'root',
mode => '0644',
content => template('profile/weaviate-compose.yml.erb'),
notify => Docker::Compose['weaviate-stack'],
}
# 5. Define the Docker Compose service managed by Puppet
docker::compose { 'weaviate-stack':
compose_files => ["${compose_dir}/docker-compose.yml"],
ensure => 'present', # 'present' means up and running
require => File["${compose_dir}/docker-compose.yml"],
}
}
templates/profile/weaviate-compose.yml.erb
:
# This file is managed by Puppet. Do not edit manually.
version: '3.4'
services:
weaviate:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
image: semitechnologies/weaviate:<%= @weaviate_version %>
ports:
- 8080:8080
restart: on-failure:0
volumes:
- <%= @data_dir %>:/var/lib/weaviate
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'text2vec-transformers'
ENABLE_MODULES: 'text2vec-transformers'
TRANSFORMERS_INFERENCE_API: 'http://t2v-transformers:8080'
CLUSTER_HOSTNAME: '<%= @cluster_hostname %>'
t2v-transformers:
image: semitechnologies/transformers-inference:sentence-transformers-all-MiniLM-L6-v2
environment:
ENABLE_CUDA: '0' # Set to '1' if GPU is available
# You can control model loading here
# SENTENCE_TRANSFORMERS_MODEL: '<%= @transformer_model %>'
通过这种方式,我们不仅固化了 Weaviate 的版本和配置,还将其依赖的向量化模型容器也一并管理起来。升级 Weaviate 或更换模型,现在只需要修改 Hiera 数据并让 Puppet Agent 运行一次即可,整个过程安全可控。
最终成果与验证
当这些 Profile 全部完成后,我们将它们组合进一个 feature_store
role,并应用到相应的节点上。Puppet Agent 会自动运行,将每个节点收敛到我们代码中定义的状态。
为了验证整个系统是否按预期工作,我们可以编写一个简单的 Python 脚本,模拟一个完整的特征生命周期:
- 生成一个特征,并将其最新值写入 MariaDB。
- 将该特征的历史记录(连同时间戳)追加到 Apache Iceberg 表中。
- 为该特征的元数据(如描述文本)生成向量,并存入 Weaviate。
- 最后,通过 Weaviate 进行相似性搜索,验证向量检索功能。
# verification_script.py
import mariadb
import weaviate
from pyiceberg.catalog import load_catalog
from sentence_transformers import SentenceTransformer
# --- Configuration ---
MARIADB_HOST = 'feature-store-online-node.internal'
MARIADB_USER = 'fs_user'
MARIADB_PASSWORD = '...' # Load from a secure source
MARIADB_DB = 'feature_store_online'
WEAVIATE_URL = 'http://feature-store-vector-node.internal:8080'
# Iceberg catalog configured in Spark Thrift Server
# We connect via Trino/PyHive for simplicity here, assuming Thrift server is running
ICEBERG_CATALOG = load_catalog(
"prod_catalog",
**{
"type": "rest", # Or use JDBC if connecting directly
"uri": "http://iceberg-rest-catalog-url",
}
)
# --- Feature Data ---
entity_id = "user_123"
feature_name = "user_7day_purchase_amount"
feature_value = 150.75
feature_description = "The total purchase amount of a user in the last 7 days."
# --- Main Logic ---
def main():
# 1. Write to MariaDB (Online Store)
try:
conn = mariadb.connect(host=MARIADB_HOST, user=MARIADB_USER, password=MARIADB_PASSWORD, database=MARIADB_DB)
cursor = conn.cursor()
print("Writing to MariaDB...")
cursor.execute(
"INSERT INTO latest_features (entity_id, feature_name, feature_value) VALUES (?, ?, ?) "
"ON DUPLICATE KEY UPDATE feature_value = VALUES(feature_value)",
(entity_id, feature_name, str(feature_value).encode('utf-8'))
)
conn.commit()
print("MariaDB write successful.")
except mariadb.Error as e:
print(f"Error connecting or writing to MariaDB: {e}")
finally:
if 'conn' in locals() and conn:
conn.close()
# 2. Append to Apache Iceberg (Offline Store)
# This part is more complex, typically done via a Spark job.
# Here we conceptualize the interaction.
print("\nSimulating write to Iceberg via Spark/Trino...")
# df = spark.createDataFrame([(entity_id, feature_name, feature_value, datetime.now())], ["id", "name", "value", "ts"])
# df.writeTo("prod_catalog.feature_logs.historical_features").append()
print("Iceberg append concept successful.")
# 3. Write to Weaviate (Vector Store)
try:
client = weaviate.Client(WEAVIATE_URL)
model = SentenceTransformer('all-MiniLM-L6-v2')
vector = model.encode(feature_description).tolist()
feature_object = {
"name": feature_name,
"description": feature_description,
}
print("\nWriting to Weaviate...")
client.data_object.create(
data_object=feature_object,
class_name="FeatureMetadata",
vector=vector
)
print("Weaviate write successful.")
# 4. Search in Weaviate
near_text = {"concepts": ["user spending habits"]}
result = client.query.get("FeatureMetadata", ["name", "description"]).with_near_text(near_text).with_limit(2).do()
print("\nWeaviate search result:")
print(result)
except Exception as e:
print(f"Error interacting with Weaviate: {e}")
if __name__ == "__main__":
main()
这个脚本虽然简化了 Iceberg 的写入部分(实际应由 Spark 作业完成),但它清晰地证明了由 Puppet 部署的各个组件都已正确配置并可以协同工作。
局限性与未来展望
当前这套基于 Puppet 的方案,在管理虚拟机或物理机集群的状态一致性方面表现出色。它极大地降低了环境管理的复杂度和人为错误的风险。然而,它并非银弹。
这套架构的弹性伸缩能力有限。如果我们需要根据负载动态增减 Weaviate 节点或 Spark worker,Puppet 的模型就显得有些笨重。在这种场景下,将这套架构迁移到 Kubernetes,并将每个 Puppet Profile 的逻辑重构成一个对应的 Kubernetes Operator,会是更现代、更云原生的选择。Operator 可以处理更复杂的生命周期管理,如自动扩缩容、故障自愈和滚动更新。
此外,当前方案主要聚焦于基础设施的部署(Day 1 操作),对于监控、日志收集和告警等运维(Day 2 操作)的覆盖还不够。下一步的迭代方向,将是集成 Prometheus、Grafana 和 OpenTelemetry,同样使用 Puppet 来部署和配置这些可观测性组件,从而形成一个从部署到运维的完整闭环。